1use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use claude_code::{
11 ClaudeAgentOptions, ClaudeSdkClient, Error as ClaudeError, InputPrompt, Message, Prompt,
12 SubprocessCliTransport,
13};
14use tokio::sync::Mutex;
15
16use crate::error::{ExecutorError, Result};
17use crate::executor::{AgentCapabilities, AgentExecutor, AvailabilityStatus, SpawnConfig};
18use crate::session::{AgentSession, SessionMetadata};
19use crate::types::{ExecutorType, PermissionPolicy};
20
21const DEFAULT_SESSION_ID: &str = "default";
22const MAX_TRACKED_SESSIONS: usize = 64;
23static FALLBACK_SESSION_COUNTER: AtomicU64 = AtomicU64::new(1);
24
25#[derive(Clone)]
42pub struct ClaudeCodeExecutor {
43 base_options: ClaudeAgentOptions,
44 sessions: Arc<Mutex<HashMap<String, ClaudeSdkClient>>>,
45}
46
47impl ClaudeCodeExecutor {
48 pub fn new() -> Self {
50 Self::with_options(ClaudeAgentOptions::default())
51 }
52
53 pub fn with_options(options: ClaudeAgentOptions) -> Self {
57 Self {
58 base_options: options,
59 sessions: Arc::new(Mutex::new(HashMap::new())),
60 }
61 }
62
63 fn build_options(
64 &self,
65 working_dir: &Path,
66 config: &SpawnConfig,
67 resume_session: Option<&str>,
68 continue_conversation: bool,
69 fork_session: bool,
70 ) -> ClaudeAgentOptions {
71 let mut options = self.base_options.clone();
72 options.cwd = Some(working_dir.to_path_buf());
73
74 if config.model.is_some() {
75 options.model = config.model.clone();
76 }
77 if config.reasoning.is_some() {
78 options.effort = config.reasoning.clone();
79 }
80 if let Some(mode) = map_permission_policy(config.permission_policy) {
81 options.permission_mode = Some(mode);
82 }
83
84 options.env.extend(config.env.iter().cloned());
85 options.continue_conversation = continue_conversation;
86 options.resume = resume_session.map(str::to_owned);
87 options.fork_session = fork_session;
88 options
89 }
90
91 async fn store_client(&self, session_id: String, client: ClaudeSdkClient) -> Result<()> {
92 let old_client = {
93 let mut sessions = self.sessions.lock().await;
94 sessions.remove(&session_id)
95 };
96
97 if let Some(mut old_client) = old_client
98 && let Err(error) = old_client.disconnect().await
99 {
100 let mut sessions = self.sessions.lock().await;
101 sessions.entry(session_id).or_insert(old_client);
102 return Err(map_claude_error(
103 "failed to disconnect replaced claude session",
104 error,
105 ));
106 }
107
108 let mut sessions = self.sessions.lock().await;
109 let current_session_id = session_id.clone();
110 sessions.insert(current_session_id.clone(), client);
111 let evicted_client = if sessions.len() > MAX_TRACKED_SESSIONS {
112 let evicted_session_id = sessions
113 .keys()
114 .find(|session_id| *session_id != ¤t_session_id)
115 .cloned()
116 .unwrap_or_else(|| DEFAULT_SESSION_ID.to_string());
117 sessions.remove(&evicted_session_id)
118 } else {
119 None
120 };
121 drop(sessions);
122
123 if let Some(mut evicted_client) = evicted_client {
124 let _ = evicted_client.disconnect().await;
125 }
126
127 Ok(())
128 }
129
130 fn to_agent_session(
131 &self,
132 session_id: String,
133 working_dir: &Path,
134 context_window_override_tokens: Option<u32>,
135 ) -> AgentSession {
136 AgentSession::from_metadata_with_exit_status(
137 SessionMetadata {
138 session_id,
139 executor_type: ExecutorType::ClaudeCode,
140 working_dir: working_dir.to_path_buf(),
141 created_at: Utc::now(),
142 last_message_id: None,
143 context_window_override_tokens,
144 },
145 crate::types::ExitStatus {
146 code: None,
147 success: true,
148 },
149 )
150 }
151}
152
153impl Default for ClaudeCodeExecutor {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159#[async_trait]
160impl AgentExecutor for ClaudeCodeExecutor {
161 fn executor_type(&self) -> ExecutorType {
162 ExecutorType::ClaudeCode
163 }
164
165 async fn spawn(
166 &self,
167 working_dir: &Path,
168 prompt: &str,
169 config: &SpawnConfig,
170 ) -> Result<AgentSession> {
171 let options = self.build_options(working_dir, config, None, false, false);
172 let mut client = ClaudeSdkClient::new(Some(options), None);
173
174 client
175 .connect(None)
176 .await
177 .map_err(|error| map_claude_error("failed to connect claude sdk client", error))?;
178 client
179 .query(InputPrompt::Text(prompt.to_owned()), DEFAULT_SESSION_ID)
180 .await
181 .map_err(|error| {
182 map_claude_error("failed to send prompt to claude sdk client", error)
183 })?;
184
185 let messages = client.receive_response().await.map_err(|error| {
186 map_claude_error("failed to receive response from claude sdk client", error)
187 })?;
188 ensure_query_succeeded(&messages, "failed to execute prompt in claude sdk session")?;
189 let session_id = extract_session_id(&messages).unwrap_or_else(unique_fallback_session_id);
190
191 self.store_client(session_id.clone(), client).await?;
192 Ok(self.to_agent_session(
193 session_id,
194 working_dir,
195 config.context_window_override_tokens,
196 ))
197 }
198
199 async fn resume(
200 &self,
201 working_dir: &Path,
202 prompt: &str,
203 session_id: &str,
204 reset_to: Option<&str>,
205 config: &SpawnConfig,
206 ) -> Result<AgentSession> {
207 let options = self.build_options(working_dir, config, Some(session_id), true, false);
208 let mut client = ClaudeSdkClient::new(Some(options), None);
209
210 client
211 .connect(None)
212 .await
213 .map_err(|error| map_claude_error("failed to connect claude sdk client", error))?;
214
215 if let Some(reset_to) = reset_to {
216 client.rewind_files(reset_to).await.map_err(|error| {
217 map_claude_error("failed to rewind claude session files", error)
218 })?;
219 }
220
221 client
222 .query(InputPrompt::Text(prompt.to_owned()), session_id)
223 .await
224 .map_err(|error| {
225 map_claude_error("failed to send prompt to resumed claude session", error)
226 })?;
227
228 let messages = client.receive_response().await.map_err(|error| {
229 map_claude_error(
230 "failed to receive response from resumed claude session",
231 error,
232 )
233 })?;
234 ensure_query_succeeded(
235 &messages,
236 "failed to execute prompt in resumed claude session",
237 )?;
238 let resumed_session_id =
239 extract_session_id(&messages).unwrap_or_else(|| session_id.to_string());
240
241 self.store_client(resumed_session_id.clone(), client)
242 .await?;
243 Ok(self.to_agent_session(
244 resumed_session_id,
245 working_dir,
246 config.context_window_override_tokens,
247 ))
248 }
249
250 fn capabilities(&self) -> AgentCapabilities {
251 AgentCapabilities {
252 session_fork: true,
253 context_usage: true,
254 mcp_support: true,
255 structured_output: true,
256 }
257 }
258
259 fn availability(&self) -> AvailabilityStatus {
260 match resolve_cli_path(&self.base_options) {
261 Ok(path) => AvailabilityStatus {
262 available: true,
263 reason: Some(format!("Claude CLI found at {}", path.display())),
264 },
265 Err(reason) => AvailabilityStatus {
266 available: false,
267 reason: Some(reason),
268 },
269 }
270 }
271}
272
273fn map_permission_policy(policy: Option<PermissionPolicy>) -> Option<claude_code::PermissionMode> {
274 match policy {
275 Some(PermissionPolicy::Bypass) => Some(claude_code::PermissionMode::BypassPermissions),
276 Some(PermissionPolicy::Prompt) => Some(claude_code::PermissionMode::Default),
277 Some(PermissionPolicy::Deny) => Some(claude_code::PermissionMode::Plan),
278 None => None,
279 }
280}
281
282fn extract_session_id(messages: &[Message]) -> Option<String> {
283 messages.iter().rev().find_map(|message| match message {
284 Message::Result(result) => Some(result.session_id.clone()),
285 Message::StreamEvent(event) => Some(event.session_id.clone()),
286 _ => None,
287 })
288}
289
290fn unique_fallback_session_id() -> String {
291 let sequence = FALLBACK_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
292 let timestamp_nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
293 format!("{DEFAULT_SESSION_ID}-{timestamp_nanos}-{sequence}")
294}
295
296fn ensure_query_succeeded(messages: &[Message], context: &str) -> Result<()> {
297 let Some(result) = messages.iter().rev().find_map(|message| match message {
298 Message::Result(result) => Some(result),
299 _ => None,
300 }) else {
301 return Err(ExecutorError::execution_failed(
302 context,
303 "claude query completed without a terminal result message",
304 ));
305 };
306
307 if result.is_error {
308 let detail = result
309 .result
310 .as_deref()
311 .unwrap_or("claude query returned an error result");
312 return Err(ExecutorError::execution_failed(context, detail));
313 }
314
315 Ok(())
316}
317
318fn map_claude_error(context: &str, error: ClaudeError) -> ExecutorError {
319 match error {
320 ClaudeError::CLINotFound(err) => ExecutorError::unavailable(context, err),
321 ClaudeError::CLIConnection(err) => ExecutorError::spawn_failed(context, err),
322 ClaudeError::Io(err) => {
323 ExecutorError::Io(std::io::Error::new(err.kind(), format!("{context}: {err}")))
324 }
325 ClaudeError::Process(err) => ExecutorError::execution_failed(context, err),
326 ClaudeError::CLIJSONDecode(err) => ExecutorError::execution_failed(context, err),
327 ClaudeError::MessageParse(err) => ExecutorError::execution_failed(context, err),
328 ClaudeError::Json(err) => ExecutorError::Serialization(err),
329 ClaudeError::ClaudeSDK(err) => ExecutorError::invalid_config(context, err),
330 ClaudeError::Other(msg) => ExecutorError::other(context, msg),
331 }
332}
333
334fn resolve_cli_path(options: &ClaudeAgentOptions) -> std::result::Result<PathBuf, String> {
335 if let Some(cli_path) = &options.cli_path {
336 return validate_cli_path(cli_path, "configured Claude CLI path");
337 }
338
339 let transport = SubprocessCliTransport::new(Prompt::Messages, options.clone())
340 .map_err(|error| error.to_string())?;
341 let resolved = PathBuf::from(&transport.cli_path);
342 validate_cli_path(&resolved, "Claude CLI path resolved by SDK")
343}
344
345fn validate_cli_path(path: &Path, label: &str) -> std::result::Result<PathBuf, String> {
346 if !path.exists() {
347 return Err(format!("{label} does not exist: {}", path.display()));
348 }
349 if !path.is_file() {
350 return Err(format!(
351 "{label} must point to an executable file: {}",
352 path.display()
353 ));
354 }
355 if !is_executable_file(path) {
356 return Err(format!("{label} is not executable: {}", path.display()));
357 }
358 Ok(path.to_path_buf())
359}
360
361fn is_executable_file(path: &Path) -> bool {
362 #[cfg(unix)]
363 {
364 use std::os::unix::fs::PermissionsExt;
365 path.metadata()
366 .map(|metadata| metadata.permissions().mode() & 0o111 != 0)
367 .unwrap_or(false)
368 }
369 #[cfg(not(unix))]
370 {
371 path.is_file()
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use claude_code::ResultMessage;
379 use std::fs::{self, File};
380 use std::path::PathBuf;
381 use std::time::{SystemTime, UNIX_EPOCH};
382
383 fn result_message(is_error: bool, result: Option<&str>) -> Message {
384 Message::Result(ResultMessage {
385 subtype: if is_error {
386 "error".to_string()
387 } else {
388 "success".to_string()
389 },
390 duration_ms: 1,
391 duration_api_ms: 1,
392 is_error,
393 num_turns: 1,
394 session_id: "session-1".to_string(),
395 stop_reason: None,
396 total_cost_usd: None,
397 usage: None,
398 result: result.map(str::to_string),
399 structured_output: None,
400 })
401 }
402
403 #[test]
404 fn fallback_session_ids_are_unique() {
405 let first = unique_fallback_session_id();
406 let second = unique_fallback_session_id();
407
408 assert_ne!(first, second);
409 assert!(first.starts_with(DEFAULT_SESSION_ID));
410 assert!(second.starts_with(DEFAULT_SESSION_ID));
411 }
412
413 #[test]
414 fn query_success_check_rejects_error_result_messages() {
415 let messages = vec![result_message(true, Some("permission denied"))];
416
417 let error = ensure_query_succeeded(&messages, "spawn failed")
418 .expect_err("error result message should fail");
419 assert!(error.to_string().contains("permission denied"));
420 }
421
422 #[test]
423 fn query_success_check_accepts_success_result_messages() {
424 let messages = vec![result_message(false, Some("ok"))];
425 assert!(ensure_query_succeeded(&messages, "spawn failed").is_ok());
426 }
427
428 #[test]
429 fn query_success_check_rejects_missing_result_messages() {
430 let messages = Vec::new();
431
432 let error = ensure_query_succeeded(&messages, "spawn failed")
433 .expect_err("missing terminal result message should fail");
434 assert!(
435 error
436 .to_string()
437 .contains("without a terminal result message")
438 );
439 }
440
441 #[test]
442 fn resolve_cli_path_rejects_directory_override() {
443 let temp_dir = new_temp_path("claude-cli-dir");
444 fs::create_dir_all(&temp_dir).expect("directory should be created");
445
446 let options = ClaudeAgentOptions {
447 cli_path: Some(temp_dir.clone()),
448 ..ClaudeAgentOptions::default()
449 };
450
451 let result = resolve_cli_path(&options);
452 assert!(result.is_err());
453 assert!(
454 result
455 .expect_err("directory should be rejected")
456 .contains("must point to an executable file")
457 );
458
459 let _ = fs::remove_dir_all(temp_dir);
460 }
461
462 #[test]
463 fn resolve_cli_path_accepts_executable_file_override() {
464 let temp_file = new_temp_path("claude-cli-file");
465 File::create(&temp_file).expect("file should be created");
466 #[cfg(unix)]
467 {
468 use std::os::unix::fs::PermissionsExt;
469 let mut permissions = fs::metadata(&temp_file)
470 .expect("metadata should exist")
471 .permissions();
472 permissions.set_mode(0o755);
473 fs::set_permissions(&temp_file, permissions).expect("permissions should be set");
474 }
475
476 let options = ClaudeAgentOptions {
477 cli_path: Some(temp_file.clone()),
478 ..ClaudeAgentOptions::default()
479 };
480
481 let resolved = resolve_cli_path(&options).expect("file should be accepted");
482 assert_eq!(resolved, temp_file);
483
484 let _ = fs::remove_file(temp_file);
485 }
486
487 fn new_temp_path(prefix: &str) -> PathBuf {
488 std::env::temp_dir().join(format!(
489 "{prefix}-{}-{}",
490 std::process::id(),
491 SystemTime::now()
492 .duration_since(UNIX_EPOCH)
493 .expect("time should move forward")
494 .as_nanos()
495 ))
496 }
497}