1use crate::{
4 errors::{Result, SdkError},
5 transport::{InputMessage, SubprocessTransport, Transport},
6 types::{ClaudeCodeOptions, ControlRequest, Message},
7};
8use futures::StreamExt;
9use std::sync::Arc;
10use tokio::sync::Mutex;
11use tracing::{debug, info};
12
13pub struct InteractiveClient {
18 transport: Arc<Mutex<SubprocessTransport>>,
19 connected: bool,
20}
21
22impl InteractiveClient {
23 pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
25 unsafe {
26 std::env::set_var("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
27 }
28 let transport = SubprocessTransport::new(options)?;
29 Ok(Self {
30 transport: Arc::new(Mutex::new(transport)),
31 connected: false,
32 })
33 }
34
35 pub async fn connect(&mut self) -> Result<()> {
37 if self.connected {
38 return Ok(());
39 }
40
41 let mut transport = self.transport.lock().await;
42 transport.connect().await?;
43 drop(transport); self.connected = true;
46 info!("Connected to Claude CLI");
47 Ok(())
48 }
49
50 pub async fn send_and_receive(&mut self, prompt: String) -> Result<Vec<Message>> {
52 if !self.connected {
53 return Err(SdkError::InvalidState {
54 message: "Not connected".into(),
55 });
56 }
57
58 {
60 let mut transport = self.transport.lock().await;
61 let message = InputMessage::user(prompt, "default".to_string());
62 transport.send_message(message).await?;
63 } debug!("Message sent, waiting for response");
66
67 let mut messages = Vec::new();
69 loop {
70 let msg_result = {
72 let mut transport = self.transport.lock().await;
73 let mut stream = transport.receive_messages();
74 stream.next().await
75 }; if let Some(result) = msg_result {
79 match result {
80 Ok(msg) => {
81 debug!("Received: {:?}", msg);
82 let is_result = matches!(msg, Message::Result { .. });
83 messages.push(msg);
84 if is_result {
85 break;
86 }
87 }
88 Err(e) => return Err(e),
89 }
90 } else {
91 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
93 }
94 }
95
96 Ok(messages)
97 }
98
99 pub async fn send_message(&mut self, prompt: String) -> Result<()> {
101 if !self.connected {
102 return Err(SdkError::InvalidState {
103 message: "Not connected".into(),
104 });
105 }
106
107 let mut transport = self.transport.lock().await;
108 let message = InputMessage::user(prompt, "default".to_string());
109 transport.send_message(message).await?;
110 drop(transport);
111
112 debug!("Message sent");
113 Ok(())
114 }
115
116 pub async fn receive_response(&mut self) -> Result<Vec<Message>> {
118 if !self.connected {
119 return Err(SdkError::InvalidState {
120 message: "Not connected".into(),
121 });
122 }
123
124 let mut messages = Vec::new();
125 loop {
126 let msg_result = {
128 let mut transport = self.transport.lock().await;
129 let mut stream = transport.receive_messages();
130 stream.next().await
131 }; if let Some(result) = msg_result {
135 match result {
136 Ok(msg) => {
137 debug!("Received: {:?}", msg);
138 let is_result = matches!(msg, Message::Result { .. });
139 messages.push(msg);
140 if is_result {
141 break;
142 }
143 }
144 Err(e) => return Err(e),
145 }
146 } else {
147 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
149 }
150 }
151
152 Ok(messages)
153 }
154
155 pub async fn interrupt(&mut self) -> Result<()> {
157 if !self.connected {
158 return Err(SdkError::InvalidState {
159 message: "Not connected".into(),
160 });
161 }
162
163 let mut transport = self.transport.lock().await;
164 let request = ControlRequest::Interrupt {
165 request_id: uuid::Uuid::new_v4().to_string(),
166 };
167 transport.send_control_request(request).await?;
168 drop(transport);
169
170 info!("Interrupt sent");
171 Ok(())
172 }
173
174 pub async fn disconnect(&mut self) -> Result<()> {
176 if !self.connected {
177 return Ok(());
178 }
179
180 let mut transport = self.transport.lock().await;
181 transport.disconnect().await?;
182 drop(transport);
183
184 self.connected = false;
185 info!("Disconnected from Claude CLI");
186 Ok(())
187 }
188}