claude_code/client/setup_token/
session.rs1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4 io::AsyncWriteExt,
5 sync::{oneshot, Mutex},
6 time,
7};
8
9use crate::{ClaudeCodeError, CommandOutput};
10
11use super::{process::SetupTokenProcess, url::extract_oauth_url};
12
13#[cfg(unix)]
14use super::pty::portable_exit_status_to_std;
15
16const CLEANUP_GRACE: Duration = Duration::from_secs(2);
17
18async fn join_or_abort<T>(mut handle: tokio::task::JoinHandle<T>, grace: Duration) -> Option<T> {
19 if grace.is_zero() {
20 handle.abort();
21 let _ = handle.await;
22 return None;
23 }
24
25 tokio::select! {
26 output = &mut handle => output.ok(),
27 _ = time::sleep(grace) => {
28 handle.abort();
29 let _ = handle.await;
30 None
31 }
32 }
33}
34
35pub struct ClaudeSetupTokenSession {
36 url: String,
37 url_rx: Option<oneshot::Receiver<String>>,
38 process: Option<SetupTokenProcess>,
39 stdout_buf: Arc<Mutex<Vec<u8>>>,
40 stderr_buf: Arc<Mutex<Vec<u8>>>,
41 stdout_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
42 stderr_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
43 timeout: Option<Duration>,
44}
45
46impl std::fmt::Debug for ClaudeSetupTokenSession {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("ClaudeSetupTokenSession")
49 .field("url", &self.url)
50 .field("timeout", &self.timeout)
51 .finish_non_exhaustive()
52 }
53}
54
55impl ClaudeSetupTokenSession {
56 pub(super) fn new(
57 process: SetupTokenProcess,
58 stdout_buf: Arc<Mutex<Vec<u8>>>,
59 stderr_buf: Arc<Mutex<Vec<u8>>>,
60 stdout_task: tokio::task::JoinHandle<Result<(), ClaudeCodeError>>,
61 stderr_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
62 url_rx: oneshot::Receiver<String>,
63 timeout: Option<Duration>,
64 ) -> Self {
65 Self {
66 url: String::new(),
67 url_rx: Some(url_rx),
68 process: Some(process),
69 stdout_buf,
70 stderr_buf,
71 stdout_task: Some(stdout_task),
72 stderr_task,
73 timeout,
74 }
75 }
76
77 pub fn url(&self) -> &str {
78 &self.url
79 }
80
81 pub async fn wait_for_url(
82 &mut self,
83 timeout: Duration,
84 ) -> Result<Option<&str>, ClaudeCodeError> {
85 if !self.url.is_empty() {
86 return Ok(Some(self.url.as_str()));
87 }
88
89 let deadline = time::Instant::now() + timeout;
90 let poll_interval = Duration::from_millis(25);
91
92 loop {
93 if !self.url.is_empty() {
94 return Ok(Some(self.url.as_str()));
95 }
96
97 if let Some(url) = self.extract_url_from_captured_output().await {
100 self.url = url;
101 self.url_rx = None;
102 return Ok(Some(self.url.as_str()));
103 }
104
105 if time::Instant::now() >= deadline {
106 self.url_rx = None;
107 return Ok(None);
108 }
109
110 let Some(rx) = self.url_rx.as_mut() else {
111 time::sleep(poll_interval).await;
112 continue;
113 };
114
115 let remaining = deadline.saturating_duration_since(time::Instant::now());
116 let nap = std::cmp::min(poll_interval, remaining);
117
118 match time::timeout(nap, rx).await {
119 Ok(Ok(url)) => {
120 self.url = url;
121 self.url_rx = None;
122 return Ok(Some(self.url.as_str()));
123 }
124 Ok(Err(_closed)) => {
125 self.url_rx = None;
126 }
128 Err(_timeout) => {
129 }
131 }
132 }
133 }
134
135 async fn extract_url_from_captured_output(&self) -> Option<String> {
136 let mut combined = Vec::new();
137 combined.extend_from_slice(&self.stdout_buf.lock().await);
138 combined.extend_from_slice(&self.stderr_buf.lock().await);
139 let text = String::from_utf8_lossy(&combined);
140 extract_oauth_url(&text)
141 }
142
143 pub async fn submit_code(mut self, code: &str) -> Result<CommandOutput, ClaudeCodeError> {
144 let process = self
145 .process
146 .as_mut()
147 .expect("setup-token session process present");
148
149 match process {
150 SetupTokenProcess::Pipes { stdin, .. } => {
151 if let Some(mut stdin) = stdin.take() {
152 let mut bytes = code.as_bytes().to_vec();
153 if !bytes.ends_with(b"\n") {
154 bytes.push(b'\n');
155 }
156 stdin
157 .write_all(&bytes)
158 .await
159 .map_err(ClaudeCodeError::StdinWrite)?;
160 }
161 }
162 #[cfg(unix)]
163 SetupTokenProcess::Pty { writer, .. } => {
164 if let Some(mut writer) = writer.take() {
165 let mut bytes = code.as_bytes().to_vec();
166 if !bytes.ends_with(b"\n") {
167 bytes.push(b'\n');
168 }
169
170 tokio::task::spawn_blocking(move || {
171 writer.write_all(&bytes)?;
172 writer.flush()
173 })
174 .await
175 .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
176 .map_err(ClaudeCodeError::StdinWrite)?;
177 }
178 }
179 };
180 self.wait().await
181 }
182
183 #[cfg(unix)]
184 async fn reap_pty_until(
185 child: &mut Box<dyn portable_pty::Child + Send + Sync>,
186 deadline: time::Instant,
187 poll_interval: Duration,
188 ) -> Option<std::process::ExitStatus> {
189 loop {
190 match child.try_wait() {
191 Ok(Some(exit)) => return Some(portable_exit_status_to_std(exit)),
192 Ok(None) => {
193 if time::Instant::now() >= deadline {
194 return None;
195 }
196 time::sleep(poll_interval).await;
197 }
198 Err(_) => return None,
199 }
200 }
201 }
202
203 pub async fn wait(mut self) -> Result<CommandOutput, ClaudeCodeError> {
204 let process = self
205 .process
206 .take()
207 .expect("setup-token session process present");
208
209 let timeout = self.timeout;
210 let mut status: Option<std::process::ExitStatus> = None;
211 let mut timed_out: Option<Duration> = None;
212 let mut wait_error: Option<ClaudeCodeError> = None;
213 let mut cleanup_deadline: Option<time::Instant> = None;
214
215 match process {
216 SetupTokenProcess::Pipes { mut child, .. } => {
217 if let Some(dur) = timeout {
218 match time::timeout(dur, child.wait()).await {
219 Ok(Ok(exit)) => {
220 status = Some(exit);
221 }
222 Ok(Err(source)) => {
223 wait_error = Some(ClaudeCodeError::Wait(source));
224 }
225 Err(_) => {
226 timed_out = Some(dur);
227 }
228 }
229 } else {
230 match child.wait().await {
231 Ok(exit) => {
232 status = Some(exit);
233 }
234 Err(source) => {
235 wait_error = Some(ClaudeCodeError::Wait(source));
236 }
237 }
238 }
239
240 if timed_out.is_some() || wait_error.is_some() {
241 let deadline =
242 cleanup_deadline.get_or_insert(time::Instant::now() + CLEANUP_GRACE);
243 let _ = child.start_kill();
244 let remaining = deadline.saturating_duration_since(time::Instant::now());
245 let _ = time::timeout(remaining, child.wait()).await;
246 }
247 }
248 #[cfg(unix)]
249 SetupTokenProcess::Pty { mut child, .. } => {
250 let poll_interval = Duration::from_millis(50);
251 let user_deadline = timeout.map(|dur| time::Instant::now() + dur);
252 loop {
253 match child.try_wait() {
254 Ok(Some(exit)) => {
255 status = Some(portable_exit_status_to_std(exit));
256 break;
257 }
258 Ok(None) => {
259 if let Some(deadline) = user_deadline {
260 if time::Instant::now() >= deadline {
261 timed_out = timeout;
262 let deadline = cleanup_deadline
263 .get_or_insert(time::Instant::now() + CLEANUP_GRACE);
264 let _ = child.kill();
265 if let Some(exit) =
266 Self::reap_pty_until(&mut child, *deadline, poll_interval)
267 .await
268 {
269 status = Some(exit);
270 }
271 break;
272 }
273 }
274 time::sleep(poll_interval).await;
275 }
276 Err(source) => {
277 wait_error = Some(ClaudeCodeError::Wait(source));
278 let deadline = cleanup_deadline
279 .get_or_insert(time::Instant::now() + CLEANUP_GRACE);
280 let _ = child.kill();
281 if let Some(exit) =
282 Self::reap_pty_until(&mut child, *deadline, poll_interval).await
283 {
284 status = Some(exit);
285 }
286 break;
287 }
288 }
289 }
290 }
291 };
292
293 let cleanup_deadline = cleanup_deadline.unwrap_or_else(time::Instant::now);
294 let failure = timed_out.is_some() || wait_error.is_some();
295
296 if failure {
297 if let Some(task) = self.stdout_task.take() {
298 let remaining = cleanup_deadline.saturating_duration_since(time::Instant::now());
299 let _ = join_or_abort(task, remaining).await;
300 }
301 if let Some(task) = self.stderr_task.take() {
302 let remaining = cleanup_deadline.saturating_duration_since(time::Instant::now());
303 let _ = join_or_abort(task, remaining).await;
304 }
305 } else {
306 if let Some(task) = self.stdout_task.take() {
307 task.await
308 .map_err(|e| ClaudeCodeError::Join(e.to_string()))??;
309 }
310 if let Some(task) = self.stderr_task.take() {
311 task.await
312 .map_err(|e| ClaudeCodeError::Join(e.to_string()))??;
313 }
314 }
315
316 let stdout = self.stdout_buf.lock().await.clone();
317 let stderr = self.stderr_buf.lock().await.clone();
318
319 if let Some(err) = wait_error {
320 return Err(err);
321 }
322 if let Some(dur) = timed_out {
323 return Err(ClaudeCodeError::Timeout { timeout: dur });
324 }
325
326 Ok(CommandOutput {
327 status: status.expect("setup-token wait should set status when no error is present"),
328 stdout,
329 stderr,
330 })
331 }
332}
333
334impl Drop for ClaudeSetupTokenSession {
335 fn drop(&mut self) {
336 let Some(process) = self.process.as_mut() else {
339 return;
340 };
341
342 match process {
343 SetupTokenProcess::Pipes { child, .. } => {
344 if child.id().is_some() {
345 let _ = child.start_kill();
346 }
347 }
348 #[cfg(unix)]
349 SetupTokenProcess::Pty { child, .. } => {
350 let _ = child.kill();
351 }
352 }
353 }
354}