Skip to main content

claude_code/client/setup_token/
session.rs

1use std::{sync::Arc, time::Duration};
2
3use tokio::{
4    io::AsyncWriteExt,
5    sync::{oneshot, Mutex},
6    time,
7};
8
9use crate::{ClaudeCodeError, CommandOutput};
10
11use super::{process::SetupTokenProcess, url::extract_oauth_url};
12
13#[cfg(unix)]
14use super::pty::portable_exit_status_to_std;
15
16const CLEANUP_GRACE: Duration = Duration::from_secs(2);
17
18async fn join_or_abort<T>(mut handle: tokio::task::JoinHandle<T>, grace: Duration) -> Option<T> {
19    if grace.is_zero() {
20        handle.abort();
21        let _ = handle.await;
22        return None;
23    }
24
25    tokio::select! {
26        output = &mut handle => output.ok(),
27        _ = time::sleep(grace) => {
28            handle.abort();
29            let _ = handle.await;
30            None
31        }
32    }
33}
34
35pub struct ClaudeSetupTokenSession {
36    url: String,
37    url_rx: Option<oneshot::Receiver<String>>,
38    process: Option<SetupTokenProcess>,
39    stdout_buf: Arc<Mutex<Vec<u8>>>,
40    stderr_buf: Arc<Mutex<Vec<u8>>>,
41    stdout_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
42    stderr_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
43    timeout: Option<Duration>,
44}
45
46impl std::fmt::Debug for ClaudeSetupTokenSession {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("ClaudeSetupTokenSession")
49            .field("url", &self.url)
50            .field("timeout", &self.timeout)
51            .finish_non_exhaustive()
52    }
53}
54
55impl ClaudeSetupTokenSession {
56    pub(super) fn new(
57        process: SetupTokenProcess,
58        stdout_buf: Arc<Mutex<Vec<u8>>>,
59        stderr_buf: Arc<Mutex<Vec<u8>>>,
60        stdout_task: tokio::task::JoinHandle<Result<(), ClaudeCodeError>>,
61        stderr_task: Option<tokio::task::JoinHandle<Result<(), ClaudeCodeError>>>,
62        url_rx: oneshot::Receiver<String>,
63        timeout: Option<Duration>,
64    ) -> Self {
65        Self {
66            url: String::new(),
67            url_rx: Some(url_rx),
68            process: Some(process),
69            stdout_buf,
70            stderr_buf,
71            stdout_task: Some(stdout_task),
72            stderr_task,
73            timeout,
74        }
75    }
76
77    pub fn url(&self) -> &str {
78        &self.url
79    }
80
81    pub async fn wait_for_url(
82        &mut self,
83        timeout: Duration,
84    ) -> Result<Option<&str>, ClaudeCodeError> {
85        if !self.url.is_empty() {
86            return Ok(Some(self.url.as_str()));
87        }
88
89        let deadline = time::Instant::now() + timeout;
90        let poll_interval = Duration::from_millis(25);
91
92        loop {
93            if !self.url.is_empty() {
94                return Ok(Some(self.url.as_str()));
95            }
96
97            // Parse from the full captured output buffer (more robust than relying only on
98            // incremental chunk parsing, and avoids “TTY noise” issues).
99            if let Some(url) = self.extract_url_from_captured_output().await {
100                self.url = url;
101                self.url_rx = None;
102                return Ok(Some(self.url.as_str()));
103            }
104
105            if time::Instant::now() >= deadline {
106                self.url_rx = None;
107                return Ok(None);
108            }
109
110            let Some(rx) = self.url_rx.as_mut() else {
111                time::sleep(poll_interval).await;
112                continue;
113            };
114
115            let remaining = deadline.saturating_duration_since(time::Instant::now());
116            let nap = std::cmp::min(poll_interval, remaining);
117
118            match time::timeout(nap, rx).await {
119                Ok(Ok(url)) => {
120                    self.url = url;
121                    self.url_rx = None;
122                    return Ok(Some(self.url.as_str()));
123                }
124                Ok(Err(_closed)) => {
125                    self.url_rx = None;
126                    // Keep looping: URL might still be extractable from buffered output.
127                }
128                Err(_timeout) => {
129                    // Keep looping; we'll re-check buffer.
130                }
131            }
132        }
133    }
134
135    async fn extract_url_from_captured_output(&self) -> Option<String> {
136        let mut combined = Vec::new();
137        combined.extend_from_slice(&self.stdout_buf.lock().await);
138        combined.extend_from_slice(&self.stderr_buf.lock().await);
139        let text = String::from_utf8_lossy(&combined);
140        extract_oauth_url(&text)
141    }
142
143    pub async fn submit_code(mut self, code: &str) -> Result<CommandOutput, ClaudeCodeError> {
144        let process = self
145            .process
146            .as_mut()
147            .expect("setup-token session process present");
148
149        match process {
150            SetupTokenProcess::Pipes { stdin, .. } => {
151                if let Some(mut stdin) = stdin.take() {
152                    let mut bytes = code.as_bytes().to_vec();
153                    if !bytes.ends_with(b"\n") {
154                        bytes.push(b'\n');
155                    }
156                    stdin
157                        .write_all(&bytes)
158                        .await
159                        .map_err(ClaudeCodeError::StdinWrite)?;
160                }
161            }
162            #[cfg(unix)]
163            SetupTokenProcess::Pty { writer, .. } => {
164                if let Some(mut writer) = writer.take() {
165                    let mut bytes = code.as_bytes().to_vec();
166                    if !bytes.ends_with(b"\n") {
167                        bytes.push(b'\n');
168                    }
169
170                    tokio::task::spawn_blocking(move || {
171                        writer.write_all(&bytes)?;
172                        writer.flush()
173                    })
174                    .await
175                    .map_err(|e| ClaudeCodeError::Join(e.to_string()))?
176                    .map_err(ClaudeCodeError::StdinWrite)?;
177                }
178            }
179        };
180        self.wait().await
181    }
182
183    #[cfg(unix)]
184    async fn reap_pty_until(
185        child: &mut Box<dyn portable_pty::Child + Send + Sync>,
186        deadline: time::Instant,
187        poll_interval: Duration,
188    ) -> Option<std::process::ExitStatus> {
189        loop {
190            match child.try_wait() {
191                Ok(Some(exit)) => return Some(portable_exit_status_to_std(exit)),
192                Ok(None) => {
193                    if time::Instant::now() >= deadline {
194                        return None;
195                    }
196                    time::sleep(poll_interval).await;
197                }
198                Err(_) => return None,
199            }
200        }
201    }
202
203    pub async fn wait(mut self) -> Result<CommandOutput, ClaudeCodeError> {
204        let process = self
205            .process
206            .take()
207            .expect("setup-token session process present");
208
209        let timeout = self.timeout;
210        let mut status: Option<std::process::ExitStatus> = None;
211        let mut timed_out: Option<Duration> = None;
212        let mut wait_error: Option<ClaudeCodeError> = None;
213        let mut cleanup_deadline: Option<time::Instant> = None;
214
215        match process {
216            SetupTokenProcess::Pipes { mut child, .. } => {
217                if let Some(dur) = timeout {
218                    match time::timeout(dur, child.wait()).await {
219                        Ok(Ok(exit)) => {
220                            status = Some(exit);
221                        }
222                        Ok(Err(source)) => {
223                            wait_error = Some(ClaudeCodeError::Wait(source));
224                        }
225                        Err(_) => {
226                            timed_out = Some(dur);
227                        }
228                    }
229                } else {
230                    match child.wait().await {
231                        Ok(exit) => {
232                            status = Some(exit);
233                        }
234                        Err(source) => {
235                            wait_error = Some(ClaudeCodeError::Wait(source));
236                        }
237                    }
238                }
239
240                if timed_out.is_some() || wait_error.is_some() {
241                    let deadline =
242                        cleanup_deadline.get_or_insert(time::Instant::now() + CLEANUP_GRACE);
243                    let _ = child.start_kill();
244                    let remaining = deadline.saturating_duration_since(time::Instant::now());
245                    let _ = time::timeout(remaining, child.wait()).await;
246                }
247            }
248            #[cfg(unix)]
249            SetupTokenProcess::Pty { mut child, .. } => {
250                let poll_interval = Duration::from_millis(50);
251                let user_deadline = timeout.map(|dur| time::Instant::now() + dur);
252                loop {
253                    match child.try_wait() {
254                        Ok(Some(exit)) => {
255                            status = Some(portable_exit_status_to_std(exit));
256                            break;
257                        }
258                        Ok(None) => {
259                            if let Some(deadline) = user_deadline {
260                                if time::Instant::now() >= deadline {
261                                    timed_out = timeout;
262                                    let deadline = cleanup_deadline
263                                        .get_or_insert(time::Instant::now() + CLEANUP_GRACE);
264                                    let _ = child.kill();
265                                    if let Some(exit) =
266                                        Self::reap_pty_until(&mut child, *deadline, poll_interval)
267                                            .await
268                                    {
269                                        status = Some(exit);
270                                    }
271                                    break;
272                                }
273                            }
274                            time::sleep(poll_interval).await;
275                        }
276                        Err(source) => {
277                            wait_error = Some(ClaudeCodeError::Wait(source));
278                            let deadline = cleanup_deadline
279                                .get_or_insert(time::Instant::now() + CLEANUP_GRACE);
280                            let _ = child.kill();
281                            if let Some(exit) =
282                                Self::reap_pty_until(&mut child, *deadline, poll_interval).await
283                            {
284                                status = Some(exit);
285                            }
286                            break;
287                        }
288                    }
289                }
290            }
291        };
292
293        let cleanup_deadline = cleanup_deadline.unwrap_or_else(time::Instant::now);
294        let failure = timed_out.is_some() || wait_error.is_some();
295
296        if failure {
297            if let Some(task) = self.stdout_task.take() {
298                let remaining = cleanup_deadline.saturating_duration_since(time::Instant::now());
299                let _ = join_or_abort(task, remaining).await;
300            }
301            if let Some(task) = self.stderr_task.take() {
302                let remaining = cleanup_deadline.saturating_duration_since(time::Instant::now());
303                let _ = join_or_abort(task, remaining).await;
304            }
305        } else {
306            if let Some(task) = self.stdout_task.take() {
307                task.await
308                    .map_err(|e| ClaudeCodeError::Join(e.to_string()))??;
309            }
310            if let Some(task) = self.stderr_task.take() {
311                task.await
312                    .map_err(|e| ClaudeCodeError::Join(e.to_string()))??;
313            }
314        }
315
316        let stdout = self.stdout_buf.lock().await.clone();
317        let stderr = self.stderr_buf.lock().await.clone();
318
319        if let Some(err) = wait_error {
320            return Err(err);
321        }
322        if let Some(dur) = timed_out {
323            return Err(ClaudeCodeError::Timeout { timeout: dur });
324        }
325
326        Ok(CommandOutput {
327            status: status.expect("setup-token wait should set status when no error is present"),
328            stdout,
329            stderr,
330        })
331    }
332}
333
334impl Drop for ClaudeSetupTokenSession {
335    fn drop(&mut self) {
336        // Best-effort cleanup; if the session is dropped before completion, avoid leaving
337        // an interactive `claude setup-token` process running.
338        let Some(process) = self.process.as_mut() else {
339            return;
340        };
341
342        match process {
343            SetupTokenProcess::Pipes { child, .. } => {
344                if child.id().is_some() {
345                    let _ = child.start_kill();
346                }
347            }
348            #[cfg(unix)]
349            SetupTokenProcess::Pty { child, .. } => {
350                let _ = child.kill();
351            }
352        }
353    }
354}