Skip to main content

ai_agent/bridge/
session_runner.rs

1//! Session spawner for bridge sessions.
2//!
3//! Translated from openclaudode/src/bridge/sessionRunner.ts
4
5use std::collections::VecDeque;
6use std::io::{BufRead, BufReader, Write};
7use std::process::{Child, Command, Stdio};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11// =============================================================================
12// CONSTANTS
13// =============================================================================
14
15const MAX_ACTIVITIES: usize = 10;
16const MAX_STDERR_LINES: usize = 10;
17
18// =============================================================================
19// TYPES
20// =============================================================================
21
22/// Status when a session ends
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum SessionDoneStatus {
25    Completed,
26    Failed,
27    Interrupted,
28}
29
30impl std::fmt::Display for SessionDoneStatus {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            SessionDoneStatus::Completed => write!(f, "completed"),
34            SessionDoneStatus::Failed => write!(f, "failed"),
35            SessionDoneStatus::Interrupted => write!(f, "interrupted"),
36        }
37    }
38}
39
40/// Type of session activity
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum SessionActivityType {
43    ToolStart,
44    Text,
45    Result,
46    Error,
47}
48
49/// A session activity (ring buffer of recent activities)
50#[derive(Debug, Clone)]
51pub struct SessionActivity {
52    pub activity_type: SessionActivityType,
53    pub summary: String,
54    pub timestamp: i64,
55}
56
57/// Permission request from child CLI
58#[derive(Debug, Clone)]
59pub struct PermissionRequest {
60    pub request_id: String,
61    pub request: PermissionRequestInner,
62}
63
64#[derive(Debug, Clone)]
65pub struct PermissionRequestInner {
66    pub subtype: String,
67    pub tool_name: String,
68    pub input: serde_json::Value,
69    pub tool_use_id: String,
70}
71
72/// Session spawn options
73#[derive(Clone)]
74pub struct SessionSpawnOpts {
75    pub session_id: String,
76    pub sdk_url: String,
77    pub access_token: String,
78    /// When true, spawn the child with CCR v2 env vars
79    pub use_ccr_v2: bool,
80    /// Required when useCcrV2 is true. Obtained from POST /worker/register.
81    pub worker_epoch: Option<u64>,
82    /// Fires once with the text of the first real user message seen
83    pub on_first_user_message: Option<Arc<dyn Fn(String) + Send + Sync>>,
84}
85
86/// Session handle for controlling a spawned session
87pub struct SessionHandle {
88    pub session_id: String,
89    pub done: Arc<Mutex<Option<SessionDoneStatus>>>,
90    pub activities: Arc<Mutex<VecDeque<SessionActivity>>>,
91    pub current_activity: Arc<Mutex<Option<SessionActivity>>>,
92    pub access_token: Arc<Mutex<String>>,
93    pub last_stderr: Arc<Mutex<VecDeque<String>>>,
94    child: Arc<Mutex<Option<Child>>>,
95    stdin: Arc<Mutex<Option<std::process::ChildStdin>>>,
96    killed: Arc<Mutex<bool>>,
97    sigkill_sent: Arc<Mutex<bool>>,
98}
99
100impl SessionHandle {
101    /// Kill the session gracefully (SIGTERM)
102    pub fn kill(&self) {
103        let mut killed = self.killed.lock().unwrap();
104        if *killed {
105            return;
106        }
107        *killed = true;
108
109        if let Ok(mut child_guard) = self.child.lock() {
110            if let Some(ref mut child) = *child_guard {
111                let _ = child.kill();
112            }
113        }
114    }
115
116    /// Force kill the session (SIGKILL)
117    pub fn force_kill(&self) {
118        let mut sent = self.sigkill_sent.lock().unwrap();
119        if *sent {
120            return;
121        }
122
123        if let Ok(mut child_guard) = self.child.lock() {
124            if let Some(ref mut child) = *child_guard {
125                if child.id() > 0 {
126                    *sent = true;
127                    let _ = child.kill();
128                }
129            }
130        }
131    }
132
133    /// Write directly to child stdin
134    pub fn write_stdin(&self, data: &str) {
135        if let Ok(mut stdin_guard) = self.stdin.lock() {
136            if let Some(ref mut stdin) = *stdin_guard {
137                let _ = stdin.write_all(data.as_bytes());
138                let _ = stdin.flush();
139            }
140        }
141    }
142
143    /// Update the access token for a running session
144    pub fn update_access_token(&self, token: String) {
145        if let Ok(mut access) = self.access_token.lock() {
146            *access = token.clone();
147        }
148
149        // Send the fresh token to the child process via stdin
150        let msg = serde_json::json!({
151            "type": "update_environment_variables",
152            "variables": { "AI_CODE_SESSION_ACCESS_TOKEN": token }
153        });
154        self.write_stdin(&format!("{}\n", msg));
155    }
156
157    /// Get current activity
158    pub fn get_current_activity(&self) -> Option<SessionActivity> {
159        self.current_activity.lock().ok().and_then(|g| g.clone())
160    }
161
162    /// Get activities
163    pub fn get_activities(&self) -> Vec<SessionActivity> {
164        self.activities
165            .lock()
166            .ok()
167            .map(|g| g.iter().cloned().collect())
168            .unwrap_or_default()
169    }
170
171    /// Get last stderr lines
172    pub fn get_last_stderr(&self) -> Vec<String> {
173        self.last_stderr
174            .lock()
175            .ok()
176            .map(|g| g.iter().cloned().collect())
177            .unwrap_or_default()
178    }
179
180    /// Get access token
181    pub fn get_access_token(&self) -> String {
182        self.access_token
183            .lock()
184            .ok()
185            .map(|g| g.clone())
186            .unwrap_or_default()
187    }
188}
189
190/// Session spawner dependencies
191pub struct SessionSpawnerDeps {
192    pub exec_path: String,
193    /// Arguments that must precede the CLI flags when spawning
194    pub script_args: Vec<String>,
195    pub env: std::collections::HashMap<String, String>,
196    pub verbose: bool,
197    pub sandbox: bool,
198    pub debug_file: Option<String>,
199    pub permission_mode: Option<String>,
200    pub on_debug: Arc<dyn Fn(String) + Send + Sync>,
201    pub on_activity: Option<Arc<dyn Fn(String, SessionActivity) + Send + Sync>>,
202    pub on_permission_request: Option<Arc<dyn Fn(String, PermissionRequest, String) + Send + Sync>>,
203}
204
205impl Default for SessionSpawnerDeps {
206    fn default() -> Self {
207        Self {
208            exec_path: String::new(),
209            script_args: Vec::new(),
210            env: std::collections::HashMap::new(),
211            verbose: false,
212            sandbox: false,
213            debug_file: None,
214            permission_mode: None,
215            on_debug: Arc::new(|_| {}),
216            on_activity: None,
217            on_permission_request: None,
218        }
219    }
220}
221
222// =============================================================================
223// HELPER FUNCTIONS
224// =============================================================================
225
226/// Map tool names to human-readable verbs for status display
227fn tool_verb(name: &str) -> String {
228    let verb = match name {
229        "Read" => "Reading",
230        "Write" => "Writing",
231        "Edit" => "Editing",
232        "MultiEdit" => "Editing",
233        "Bash" => "Running",
234        "Glob" => "Searching",
235        "Grep" => "Searching",
236        "WebFetch" => "Fetching",
237        "WebSearch" => "Searching",
238        "Task" => "Running task",
239        "FileReadTool" => "Reading",
240        "FileWriteTool" => "Writing",
241        "FileEditTool" => "Editing",
242        "GlobTool" => "Searching",
243        "GrepTool" => "Searching",
244        "BashTool" => "Running",
245        "NotebookEditTool" => "Editing notebook",
246        "LSP" => "LSP",
247        _ => name,
248    };
249    verb.to_string()
250}
251
252/// Extract summary from tool invocation
253fn tool_summary(name: &str, input: &serde_json::Value) -> String {
254    let verb = tool_verb(name);
255
256    let target = input
257        .get("file_path")
258        .or_else(|| input.get("filePath"))
259        .or_else(|| input.get("pattern"))
260        .or_else(|| input.get("command"))
261        .or_else(|| input.get("url"))
262        .or_else(|| input.get("query"))
263        .and_then(|v| v.as_str())
264        .map(|s| {
265            if s.len() > 60 {
266                format!("{}...", &s[..60])
267            } else {
268                s.to_string()
269            }
270        });
271
272    match target {
273        Some(t) => format!("{} {}", verb, t),
274        None => verb.to_string(),
275    }
276}
277
278/// Build a short preview of tool input for debug logging
279fn input_preview(input: &serde_json::Value) -> String {
280    let mut parts = Vec::new();
281    if let Some(obj) = input.as_object() {
282        for (key, val) in obj.iter().take(3) {
283            if let Some(s) = val.as_str() {
284                let truncated = if s.len() > 100 {
285                    format!("{}...", &s[..100])
286                } else {
287                    s.to_string()
288                };
289                parts.push(format!("{}=\"{}\"", key, truncated));
290            }
291        }
292    }
293    parts.join(" ")
294}
295
296/// Extract activities from an NDJSON line
297fn extract_activities(
298    line: &str,
299    session_id: &str,
300    on_debug: &Arc<dyn Fn(String) + Send + Sync>,
301) -> Vec<SessionActivity> {
302    let parsed: serde_json::Value = match serde_json::from_str(line) {
303        Ok(v) => v,
304        Err(_) => return Vec::new(),
305    };
306
307    let obj = match parsed.as_object() {
308        Some(o) => o,
309        None => return Vec::new(),
310    };
311
312    let mut activities = Vec::new();
313    let now = std::time::SystemTime::now()
314        .duration_since(std::time::UNIX_EPOCH)
315        .map(|d| d.as_millis() as i64)
316        .unwrap_or(0);
317
318    // Handle assistant messages (tool_use, text)
319    if let Some(msg_type) = obj.get("type").and_then(|v| v.as_str()) {
320        if msg_type == "assistant" {
321            if let Some(message) = obj.get("message").and_then(|v| v.as_object()) {
322                if let Some(content) = message.get("content").and_then(|v| v.as_array()) {
323                    for block in content {
324                        let block_obj = match block.as_object() {
325                            Some(o) => o,
326                            None => continue,
327                        };
328
329                        let block_type = match block_obj.get("type").and_then(|v| v.as_str()) {
330                            Some(t) => t,
331                            None => continue,
332                        };
333
334                        if block_type == "tool_use" {
335                            let name = block_obj
336                                .get("name")
337                                .and_then(|v| v.as_str())
338                                .unwrap_or("Tool");
339                            let input = block_obj.get("input").unwrap_or(&serde_json::Value::Null);
340                            let summary = tool_summary(name, input);
341
342                            on_debug(format!(
343                                "[bridge:activity] sessionId={} tool_use name={} {}",
344                                session_id,
345                                name,
346                                input_preview(input)
347                            ));
348
349                            activities.push(SessionActivity {
350                                activity_type: SessionActivityType::ToolStart,
351                                summary,
352                                timestamp: now,
353                            });
354                        } else if block_type == "text" {
355                            if let Some(text) = block_obj.get("text").and_then(|v| v.as_str()) {
356                                if !text.is_empty() {
357                                    let summary = if text.len() > 80 {
358                                        format!("{}...", &text[..80])
359                                    } else {
360                                        text.to_string()
361                                    };
362
363                                    on_debug(format!(
364                                        "[bridge:activity] sessionId={} text \"{}\"",
365                                        session_id,
366                                        if text.len() > 100 {
367                                            format!("{}...", &text[..100])
368                                        } else {
369                                            text.to_string()
370                                        }
371                                    ));
372
373                                    activities.push(SessionActivity {
374                                        activity_type: SessionActivityType::Text,
375                                        summary,
376                                        timestamp: now,
377                                    });
378                                }
379                            }
380                        }
381                    }
382                }
383            }
384        } else if msg_type == "result" {
385            let subtype = obj.get("subtype").and_then(|v| v.as_str());
386
387            if subtype == Some("success") {
388                on_debug(format!(
389                    "[bridge:activity] sessionId={} result subtype=success",
390                    session_id
391                ));
392
393                activities.push(SessionActivity {
394                    activity_type: SessionActivityType::Result,
395                    summary: "Session completed".to_string(),
396                    timestamp: now,
397                });
398            } else if let Some(sub) = subtype {
399                let errors = obj.get("errors").and_then(|v| v.as_array());
400                let error_summary = errors
401                    .and_then(|arr| arr.first())
402                    .and_then(|v| v.as_str())
403                    .map(|s| s.to_string())
404                    .unwrap_or_else(|| format!("Error: {}", sub));
405
406                on_debug(format!(
407                    "[bridge:activity] sessionId={} result subtype={} error=\"{}\"",
408                    session_id, sub, error_summary
409                ));
410
411                activities.push(SessionActivity {
412                    activity_type: SessionActivityType::Error,
413                    summary: error_summary,
414                    timestamp: now,
415                });
416            } else {
417                on_debug(format!(
418                    "[bridge:activity] sessionId={} result subtype=undefined",
419                    session_id
420                ));
421            }
422        }
423    }
424
425    activities
426}
427
428/// Extract plain text from a user message NDJSON line.
429fn extract_user_message_text(msg: &serde_json::Value) -> Option<String> {
430    let obj = msg.as_object()?;
431
432    // Skip tool-result user messages (wrapped subagent results) and synthetic messages
433    if obj.get("parent_tool_use_id").is_some()
434        || obj
435            .get("isSynthetic")
436            .and_then(|v| v.as_bool())
437            .unwrap_or(false)
438        || obj
439            .get("isReplay")
440            .and_then(|v| v.as_bool())
441            .unwrap_or(false)
442    {
443        return None;
444    }
445
446    let message = obj.get("message")?.as_object()?;
447    let content = message.get("content")?;
448
449    let text = if let Some(s) = content.as_str() {
450        Some(s.to_string())
451    } else if let Some(arr) = content.as_array() {
452        for block in arr {
453            if let Some(block_obj) = block.as_object() {
454                if block_obj.get("type").and_then(|v| v.as_str()) == Some("text") {
455                    if let Some(text) = block_obj.get("text").and_then(|v| v.as_str()) {
456                        return Some(text.trim().to_string());
457                    }
458                }
459            }
460        }
461        None
462    } else {
463        None
464    };
465
466    text.filter(|s| !s.is_empty())
467}
468
469// =============================================================================
470// SESSION SPAWNER
471// =============================================================================
472
473/// Create a session spawner
474pub fn create_session_spawner(
475    deps: SessionSpawnerDeps,
476) -> impl Fn(SessionSpawnOpts, &str) -> SessionHandle {
477    move |opts: SessionSpawnOpts, dir: &str| {
478        let on_debug = &deps.on_debug;
479
480        // Build args
481        let mut args = deps.script_args.clone();
482        args.push("--print".to_string());
483        args.push("--sdk-url".to_string());
484        args.push(opts.sdk_url.clone());
485        args.push("--session-id".to_string());
486        args.push(opts.session_id.clone());
487        args.push("--input-format".to_string());
488        args.push("stream-json".to_string());
489        args.push("--output-format".to_string());
490        args.push("stream-json".to_string());
491        args.push("--replay-user-messages".to_string());
492
493        if deps.verbose {
494            args.push("--verbose".to_string());
495        }
496
497        if let Some(ref debug_file) = deps.debug_file {
498            args.push("--debug-file".to_string());
499            args.push(debug_file.clone());
500        }
501
502        if let Some(ref permission_mode) = deps.permission_mode {
503            args.push("--permission-mode".to_string());
504            args.push(permission_mode.clone());
505        }
506
507        // Build env
508        let mut env = deps.env.clone();
509        env.remove("AI_CODE_OAUTH_TOKEN");
510        env.insert("AI_CODE_ENVIRONMENT_KIND".to_string(), "bridge".to_string());
511
512        if deps.sandbox {
513            env.insert("AI_CODE_FORCE_SANDBOX".to_string(), "1".to_string());
514        }
515
516        env.insert(
517            "AI_CODE_SESSION_ACCESS_TOKEN".to_string(),
518            opts.access_token.clone(),
519        );
520
521        // v1: HybridTransport
522        env.insert(
523            "AI_CODE_POST_FOR_SESSION_INGRESS_V2".to_string(),
524            "1".to_string(),
525        );
526
527        // v2: SSETransport + CCRClient
528        if opts.use_ccr_v2 {
529            env.insert("AI_CODE_USE_CCR_V2".to_string(), "1".to_string());
530            if let Some(epoch) = opts.worker_epoch {
531                env.insert("AI_CODE_WORKER_EPOCH".to_string(), epoch.to_string());
532            }
533        }
534
535        on_debug(format!(
536            "[bridge:session] Spawning sessionId={} sdkUrl={} accessToken={}",
537            opts.session_id,
538            opts.sdk_url,
539            if opts.access_token.is_empty() {
540                "MISSING"
541            } else {
542                "present"
543            }
544        ));
545        on_debug(format!("[bridge:session] Child args: {:?}", args));
546
547        // Spawn child process
548        let mut child = Command::new(&deps.exec_path);
549        child.args(&args);
550        child.current_dir(dir);
551        child.envs(&env);
552        child.stdin(Stdio::piped());
553        child.stdout(Stdio::piped());
554        child.stderr(Stdio::piped());
555
556        #[cfg(windows)]
557        child.windows_hide(true);
558
559        let mut child = match child.spawn() {
560            Ok(c) => c,
561            Err(e) => {
562                on_debug(format!(
563                    "[bridge:session] sessionId={} spawn error: {}",
564                    opts.session_id, e
565                ));
566                // Return a failed session handle
567                return SessionHandle {
568                    session_id: opts.session_id,
569                    done: Arc::new(Mutex::new(Some(SessionDoneStatus::Failed))),
570                    activities: Arc::new(Mutex::new(VecDeque::new())),
571                    current_activity: Arc::new(Mutex::new(None)),
572                    access_token: Arc::new(Mutex::new(opts.access_token)),
573                    last_stderr: Arc::new(Mutex::new(VecDeque::new())),
574                    child: Arc::new(Mutex::new(None)),
575                    stdin: Arc::new(Mutex::new(None)),
576                    killed: Arc::new(Mutex::new(true)),
577                    sigkill_sent: Arc::new(Mutex::new(true)),
578                };
579            }
580        };
581
582        let pid = child.id();
583        on_debug(format!(
584            "[bridge:session] sessionId={} pid={}",
585            opts.session_id, pid
586        ));
587
588        // Get stdin
589        let stdin = child.stdin.take();
590
591        // Initialize state
592        let activities: Arc<Mutex<VecDeque<SessionActivity>>> =
593            Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ACTIVITIES)));
594        let current_activity: Arc<Mutex<Option<SessionActivity>>> = Arc::new(Mutex::new(None));
595        let last_stderr: Arc<Mutex<VecDeque<String>>> =
596            Arc::new(Mutex::new(VecDeque::with_capacity(MAX_STDERR_LINES)));
597        let done_status: Arc<Mutex<Option<SessionDoneStatus>>> = Arc::new(Mutex::new(None));
598
599        let session_id = opts.session_id.clone();
600        let on_activity = deps.on_activity.clone();
601        let on_permission_request = deps.on_permission_request.clone();
602        let verbose = deps.verbose;
603
604        // Handle stdout
605        if let Some(stdout) = child.stdout.take() {
606            let activities_clone = activities.clone();
607            let current_activity_clone = current_activity.clone();
608            let session_id_clone = session_id.clone();
609            let on_debug_clone = on_debug.clone();
610            let on_activity_clone = on_activity.clone();
611            let opts_clone = opts.clone();
612
613            thread::spawn(move || {
614                let reader = BufReader::new(stdout);
615                for line in reader.lines().map_while(Result::ok) {
616                    // Log message
617                    on_debug_clone(format!(
618                        "[bridge:ws] sessionId={} <<< {}",
619                        session_id_clone,
620                        if line.len() > 200 {
621                            format!("{}...", &line[..200])
622                        } else {
623                            line.clone()
624                        }
625                    ));
626
627                    // Forward in verbose mode
628                    if verbose {
629                        eprintln!("{}", line);
630                    }
631
632                    // Extract activities
633                    let extracted = extract_activities(&line, &session_id_clone, &on_debug_clone);
634                    for activity in extracted {
635                        if let Ok(mut acts) = activities_clone.lock() {
636                            if acts.len() >= MAX_ACTIVITIES {
637                                acts.pop_front();
638                            }
639                            acts.push_back(activity.clone());
640
641                            if let Ok(mut current) = current_activity_clone.lock() {
642                                *current = Some(activity.clone());
643                            }
644
645                            if let Some(ref callback) = on_activity_clone {
646                                callback(session_id_clone.clone(), activity);
647                            }
648                        }
649                    }
650
651                    // Check for control_request and user messages
652                    if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&line) {
653                        if let Some(obj) = parsed.as_object() {
654                            if obj.get("type").and_then(|v| v.as_str()) == Some("control_request") {
655                                if let Some(request) =
656                                    obj.get("request").and_then(|v| v.as_object())
657                                {
658                                    if request.get("subtype").and_then(|v| v.as_str())
659                                        == Some("can_use_tool")
660                                    {
661                                        if let Some(ref callback) = on_permission_request {
662                                            let perm_request = PermissionRequest {
663                                                request_id: obj
664                                                    .get("request_id")
665                                                    .and_then(|v| v.as_str())
666                                                    .unwrap_or("")
667                                                    .to_string(),
668                                                request: PermissionRequestInner {
669                                                    subtype: request
670                                                        .get("subtype")
671                                                        .and_then(|v| v.as_str())
672                                                        .unwrap_or("")
673                                                        .to_string(),
674                                                    tool_name: request
675                                                        .get("tool_name")
676                                                        .and_then(|v| v.as_str())
677                                                        .unwrap_or("")
678                                                        .to_string(),
679                                                    input: request
680                                                        .get("input")
681                                                        .cloned()
682                                                        .unwrap_or(serde_json::Value::Null),
683                                                    tool_use_id: request
684                                                        .get("tool_use_id")
685                                                        .and_then(|v| v.as_str())
686                                                        .unwrap_or("")
687                                                        .to_string(),
688                                                },
689                                            };
690                                            callback(
691                                                opts_clone.session_id.clone(),
692                                                perm_request,
693                                                opts_clone.access_token.clone(),
694                                            );
695                                        }
696                                    }
697                                }
698                            } else if obj.get("type").and_then(|v| v.as_str()) == Some("user") {
699                                if let Some(text) = extract_user_message_text(&parsed) {
700                                    if let Some(ref callback) = opts_clone.on_first_user_message {
701                                        callback(text);
702                                    }
703                                }
704                            }
705                        }
706                    }
707                }
708            });
709        }
710
711        // Handle stderr
712        if let Some(stderr) = child.stderr.take() {
713            let last_stderr_clone = last_stderr.clone();
714            let on_debug_clone = on_debug.clone();
715
716            thread::spawn(move || {
717                let reader = BufReader::new(stderr);
718                for line in reader.lines().map_while(Result::ok) {
719                    // Forward to stderr in verbose mode
720                    if verbose {
721                        eprintln!("{}", line);
722                    }
723
724                    // Ring buffer of last N lines
725                    if let Ok(mut stderr_lines) = last_stderr_clone.lock() {
726                        if stderr_lines.len() >= MAX_STDERR_LINES {
727                            stderr_lines.pop_front();
728                        }
729                        stderr_lines.push_back(line.clone());
730                    }
731
732                    on_debug_clone(line);
733                }
734            });
735        }
736
737        // Wait for child to exit
738        let session_id_clone = session_id.clone();
739        let on_debug_clone = on_debug.clone();
740        let done_status_clone = done_status.clone();
741        let child_for_handle = Arc::new(Mutex::new(Some(child)));
742        let child_for_thread = child_for_handle.clone();
743
744        thread::spawn(move || {
745            let mut child_guard = child_for_thread.lock().unwrap();
746            if let Some(ref mut child) = *child_guard {
747                let status = child.wait();
748                let on_debug = on_debug_clone;
749
750                match status {
751                    Ok(exit_status) => {
752                        // Check for interruption signals via exit code
753                        // 15 = SIGTERM, 2 = SIGINT (platform-specific)
754                        let code = exit_status.code().unwrap_or(-1);
755                        if code == 15 || code == 2 || code == -11 {
756                            on_debug(format!(
757                                "[bridge:session] sessionId={} interrupted exit_code={} pid={}",
758                                session_id_clone,
759                                code,
760                                child.id()
761                            ));
762                            if let Ok(mut status) = done_status_clone.lock() {
763                                *status = Some(SessionDoneStatus::Interrupted);
764                            }
765                        } else if exit_status.success() {
766                            on_debug(format!(
767                                "[bridge:session] sessionId={} completed exit_code=0 pid={}",
768                                session_id_clone,
769                                child.id()
770                            ));
771                            if let Ok(mut status) = done_status_clone.lock() {
772                                *status = Some(SessionDoneStatus::Completed);
773                            }
774                        } else {
775                            on_debug(format!(
776                                "[bridge:session] sessionId={} failed exit_code={:?} pid={}",
777                                session_id_clone,
778                                exit_status.code(),
779                                child.id()
780                            ));
781                            if let Ok(mut status) = done_status_clone.lock() {
782                                *status = Some(SessionDoneStatus::Failed);
783                            }
784                        }
785                    }
786                    Err(e) => {
787                        on_debug(format!(
788                            "[bridge:session] sessionId={} wait error: {}",
789                            session_id_clone, e
790                        ));
791                        if let Ok(mut status) = done_status_clone.lock() {
792                            *status = Some(SessionDoneStatus::Failed);
793                        }
794                    }
795                }
796            }
797        });
798
799        SessionHandle {
800            session_id: opts.session_id,
801            done: done_status,
802            activities,
803            current_activity,
804            access_token: Arc::new(Mutex::new(opts.access_token)),
805            last_stderr,
806            child: child_for_handle,
807            stdin: Arc::new(Mutex::new(stdin)),
808            killed: Arc::new(Mutex::new(false)),
809            sigkill_sent: Arc::new(Mutex::new(false)),
810        }
811    }
812}
813
814/// Sanitize a session ID for use in file names.
815/// Strips any characters that could cause path traversal or other filesystem issues.
816pub fn safe_filename_id(id: &str) -> String {
817    id.chars()
818        .map(|c| {
819            if c.is_alphanumeric() || c == '-' || c == '_' {
820                c
821            } else {
822                '_'
823            }
824        })
825        .collect()
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831
832    #[test]
833    fn test_safe_filename_id() {
834        assert_eq!(safe_filename_id("session_abc123"), "session_abc123");
835        assert_eq!(safe_filename_id("cse_abc-123"), "cse_abc-123");
836        assert_eq!(safe_filename_id("../etc/passwd"), "___etc_passwd");
837    }
838
839    #[test]
840    fn test_tool_summary() {
841        let input = serde_json::json!({ "file_path": "/path/to/file.txt" });
842        assert_eq!(tool_summary("Read", &input), "Reading /path/to/file.txt");
843
844        let input2 = serde_json::json!({ "command": "ls -la" });
845        assert_eq!(tool_summary("Bash", &input2), "Running ls -la");
846    }
847
848    #[test]
849    fn test_input_preview() {
850        let input = serde_json::json!({
851            "file_path": "/test.txt",
852            "content": "hello world"
853        });
854        let preview = input_preview(&input);
855        assert!(preview.contains("file_path="));
856    }
857}