cc_sdk/
interactive.rs

1//! Working interactive client implementation
2
3use crate::{
4    errors::{Result, SdkError},
5    transport::{InputMessage, SubprocessTransport, Transport},
6    types::{ClaudeCodeOptions, ControlRequest, Message},
7};
8use futures::{Stream, StreamExt};
9use std::sync::Arc;
10use tokio::sync::Mutex;
11use tokio_stream::wrappers::ReceiverStream;
12use tracing::{debug, info};
13
14/// Interactive client for stateful conversations with Claude
15///
16/// This is the recommended client for interactive use. It provides a clean API
17/// that matches the Python SDK's functionality.
18pub struct InteractiveClient {
19    transport: Arc<Mutex<Box<dyn Transport + Send>>>,
20    connected: bool,
21}
22
23impl InteractiveClient {
24    /// Create a new client
25    pub fn new(options: ClaudeCodeOptions) -> Result<Self> {
26        unsafe {
27            std::env::set_var("CLAUDE_CODE_ENTRYPOINT", "sdk-rust");
28        }
29        let transport: Box<dyn Transport + Send> = Box::new(SubprocessTransport::new(options)?);
30        Ok(Self {
31            transport: Arc::new(Mutex::new(transport)),
32            connected: false,
33        })
34    }
35
36    /// Connect to Claude
37    pub async fn connect(&mut self) -> Result<()> {
38        if self.connected {
39            return Ok(());
40        }
41
42        let mut transport = self.transport.lock().await;
43        transport.connect().await?;
44        drop(transport); // Release lock immediately
45
46        self.connected = true;
47        info!("Connected to Claude CLI");
48        Ok(())
49    }
50
51    /// Send a message and receive all messages until Result message
52    pub async fn send_and_receive(&mut self, prompt: String) -> Result<Vec<Message>> {
53        if !self.connected {
54            return Err(SdkError::InvalidState {
55                message: "Not connected".into(),
56            });
57        }
58
59        // Send message
60        {
61            let mut transport = self.transport.lock().await;
62            let message = InputMessage::user(prompt, "default".to_string());
63            transport.send_message(message).await?;
64        } // Lock released here
65
66        debug!("Message sent, waiting for response");
67
68        // Receive messages
69        let mut messages = Vec::new();
70        loop {
71            // Try to get a message
72            let msg_result = {
73                let mut transport = self.transport.lock().await;
74                let mut stream = transport.receive_messages();
75                stream.next().await
76            }; // Lock released here
77
78            // Process the message
79            if let Some(result) = msg_result {
80                match result {
81                    Ok(msg) => {
82                        debug!("Received: {:?}", msg);
83                        let is_result = matches!(msg, Message::Result { .. });
84                        messages.push(msg);
85                        if is_result {
86                            break;
87                        }
88                    }
89                    Err(e) => return Err(e),
90                }
91            } else {
92                // No more messages, wait a bit
93                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
94            }
95        }
96
97        Ok(messages)
98    }
99
100    /// Send a message without waiting for response
101    pub async fn send_message(&mut self, prompt: String) -> Result<()> {
102        if !self.connected {
103            return Err(SdkError::InvalidState {
104                message: "Not connected".into(),
105            });
106        }
107
108        let mut transport = self.transport.lock().await;
109        let message = InputMessage::user(prompt, "default".to_string());
110        transport.send_message(message).await?;
111        drop(transport);
112
113        debug!("Message sent");
114        Ok(())
115    }
116
117    /// Receive messages until Result message (convenience method like Python SDK)
118    pub async fn receive_response(&mut self) -> Result<Vec<Message>> {
119        if !self.connected {
120            return Err(SdkError::InvalidState {
121                message: "Not connected".into(),
122            });
123        }
124
125        let mut messages = Vec::new();
126        loop {
127            // Try to get a message
128            let msg_result = {
129            let mut transport = self.transport.lock().await;
130            let mut stream = transport.receive_messages();
131                stream.next().await
132            }; // Lock released here
133
134            // Process the message
135            if let Some(result) = msg_result {
136                match result {
137                    Ok(msg) => {
138                        debug!("Received: {:?}", msg);
139                        let is_result = matches!(msg, Message::Result { .. });
140                        messages.push(msg);
141                        if is_result {
142                            break;
143                        }
144                    }
145                    Err(e) => return Err(e),
146                }
147            } else {
148                // No more messages, wait a bit
149                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
150            }
151        }
152
153        Ok(messages)
154    }
155
156    /// Receive messages as a stream (streaming output support)
157    /// 
158    /// Returns a stream of messages that can be iterated over asynchronously.
159    /// This is similar to Python SDK's `receive_messages()` method.
160    /// 
161    /// # Example
162    /// 
163    /// ```rust,no_run
164    /// use cc_sdk::{InteractiveClient, ClaudeCodeOptions};
165    /// use futures::StreamExt;
166    /// 
167    /// #[tokio::main]
168    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
169    ///     let mut client = InteractiveClient::new(ClaudeCodeOptions::default())?;
170    ///     client.connect().await?;
171    ///     
172    ///     // Send a message
173    ///     client.send_message("Hello!".to_string()).await?;
174    ///     
175    ///     // Receive messages as a stream
176    ///     let mut stream = client.receive_messages_stream().await;
177    ///     while let Some(msg) = stream.next().await {
178    ///         match msg {
179    ///             Ok(message) => println!("Received: {:?}", message),
180    ///             Err(e) => eprintln!("Error: {}", e),
181    ///         }
182    ///     }
183    ///     
184    ///     Ok(())
185    /// }
186    /// ```
187    pub async fn receive_messages_stream(&mut self) -> impl Stream<Item = Result<Message>> + '_ {
188        // Create a channel for messages
189        let (tx, rx) = tokio::sync::mpsc::channel(100);
190        let transport = self.transport.clone();
191        
192        // Spawn a task to receive messages from transport
193        tokio::spawn(async move {
194            let mut transport = transport.lock().await;
195            let mut stream = transport.receive_messages();
196            
197            while let Some(result) = stream.next().await {
198                // Send each message through the channel
199                if tx.send(result).await.is_err() {
200                    // Receiver dropped, stop sending
201                    break;
202                }
203            }
204        });
205        
206        // Return the receiver as a stream
207        ReceiverStream::new(rx)
208    }
209
210    /// Receive messages as an async iterator until a Result message
211    /// 
212    /// This is a convenience method that collects messages until a Result message
213    /// is received, similar to Python SDK's `receive_response()`.
214    pub async fn receive_response_stream(&mut self) -> impl Stream<Item = Result<Message>> + '_ {
215        // Create a stream that stops after Result message
216        async_stream::stream! {
217            let mut stream = self.receive_messages_stream().await;
218            
219            while let Some(result) = stream.next().await {
220                match &result {
221                    Ok(msg) => {
222                        let is_result = matches!(msg, Message::Result { .. });
223                        yield result;
224                        if is_result {
225                            break;
226                        }
227                    }
228                    Err(_) => {
229                        yield result;
230                        break;
231                    }
232                }
233            }
234        }
235    }
236
237    /// Send interrupt signal to cancel current operation
238    pub async fn interrupt(&mut self) -> Result<()> {
239        if !self.connected {
240            return Err(SdkError::InvalidState {
241                message: "Not connected".into(),
242            });
243        }
244
245        let mut transport = self.transport.lock().await;
246        let request = ControlRequest::Interrupt {
247            request_id: uuid::Uuid::new_v4().to_string(),
248        };
249        transport.send_control_request(request).await?;
250        drop(transport);
251
252        info!("Interrupt sent");
253        Ok(())
254    }
255
256    /// Disconnect
257    pub async fn disconnect(&mut self) -> Result<()> {
258        if !self.connected {
259            return Ok(());
260        }
261
262        let mut transport = self.transport.lock().await;
263        transport.disconnect().await?;
264        drop(transport);
265
266        self.connected = false;
267        info!("Disconnected from Claude CLI");
268        Ok(())
269    }
270}