1use std::collections::VecDeque;
6use std::io::{BufRead, BufReader, Write};
7use std::process::{Child, Command, Stdio};
8use std::sync::{Arc, Mutex};
9use std::thread;
10
11const MAX_ACTIVITIES: usize = 10;
16const MAX_STDERR_LINES: usize = 10;
17
18#[derive(Debug, Clone, PartialEq, Eq)]
24pub enum SessionDoneStatus {
25 Completed,
26 Failed,
27 Interrupted,
28}
29
30impl std::fmt::Display for SessionDoneStatus {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 SessionDoneStatus::Completed => write!(f, "completed"),
34 SessionDoneStatus::Failed => write!(f, "failed"),
35 SessionDoneStatus::Interrupted => write!(f, "interrupted"),
36 }
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum SessionActivityType {
43 ToolStart,
44 Text,
45 Result,
46 Error,
47}
48
49#[derive(Debug, Clone)]
51pub struct SessionActivity {
52 pub activity_type: SessionActivityType,
53 pub summary: String,
54 pub timestamp: i64,
55}
56
57#[derive(Debug, Clone)]
59pub struct PermissionRequest {
60 pub request_id: String,
61 pub request: PermissionRequestInner,
62}
63
64#[derive(Debug, Clone)]
65pub struct PermissionRequestInner {
66 pub subtype: String,
67 pub tool_name: String,
68 pub input: serde_json::Value,
69 pub tool_use_id: String,
70}
71
72#[derive(Clone)]
74pub struct SessionSpawnOpts {
75 pub session_id: String,
76 pub sdk_url: String,
77 pub access_token: String,
78 pub use_ccr_v2: bool,
80 pub worker_epoch: Option<u64>,
82 pub on_first_user_message: Option<Arc<dyn Fn(String) + Send + Sync>>,
84}
85
86pub struct SessionHandle {
88 pub session_id: String,
89 pub done: Arc<Mutex<Option<SessionDoneStatus>>>,
90 pub activities: Arc<Mutex<VecDeque<SessionActivity>>>,
91 pub current_activity: Arc<Mutex<Option<SessionActivity>>>,
92 pub access_token: Arc<Mutex<String>>,
93 pub last_stderr: Arc<Mutex<VecDeque<String>>>,
94 child: Arc<Mutex<Option<Child>>>,
95 stdin: Arc<Mutex<Option<std::process::ChildStdin>>>,
96 killed: Arc<Mutex<bool>>,
97 sigkill_sent: Arc<Mutex<bool>>,
98}
99
100impl SessionHandle {
101 pub fn kill(&self) {
103 let mut killed = self.killed.lock().unwrap();
104 if *killed {
105 return;
106 }
107 *killed = true;
108
109 if let Ok(mut child_guard) = self.child.lock() {
110 if let Some(ref mut child) = *child_guard {
111 let _ = child.kill();
112 }
113 }
114 }
115
116 pub fn force_kill(&self) {
118 let mut sent = self.sigkill_sent.lock().unwrap();
119 if *sent {
120 return;
121 }
122
123 if let Ok(mut child_guard) = self.child.lock() {
124 if let Some(ref mut child) = *child_guard {
125 if child.id() > 0 {
126 *sent = true;
127 let _ = child.kill();
128 }
129 }
130 }
131 }
132
133 pub fn write_stdin(&self, data: &str) {
135 if let Ok(mut stdin_guard) = self.stdin.lock() {
136 if let Some(ref mut stdin) = *stdin_guard {
137 let _ = stdin.write_all(data.as_bytes());
138 let _ = stdin.flush();
139 }
140 }
141 }
142
143 pub fn update_access_token(&self, token: String) {
145 if let Ok(mut access) = self.access_token.lock() {
146 *access = token.clone();
147 }
148
149 let msg = serde_json::json!({
151 "type": "update_environment_variables",
152 "variables": { "AI_CODE_SESSION_ACCESS_TOKEN": token }
153 });
154 self.write_stdin(&format!("{}\n", msg));
155 }
156
157 pub fn get_current_activity(&self) -> Option<SessionActivity> {
159 self.current_activity.lock().ok().and_then(|g| g.clone())
160 }
161
162 pub fn get_activities(&self) -> Vec<SessionActivity> {
164 self.activities
165 .lock()
166 .ok()
167 .map(|g| g.iter().cloned().collect())
168 .unwrap_or_default()
169 }
170
171 pub fn get_last_stderr(&self) -> Vec<String> {
173 self.last_stderr
174 .lock()
175 .ok()
176 .map(|g| g.iter().cloned().collect())
177 .unwrap_or_default()
178 }
179
180 pub fn get_access_token(&self) -> String {
182 self.access_token
183 .lock()
184 .ok()
185 .map(|g| g.clone())
186 .unwrap_or_default()
187 }
188}
189
190pub struct SessionSpawnerDeps {
192 pub exec_path: String,
193 pub script_args: Vec<String>,
195 pub env: std::collections::HashMap<String, String>,
196 pub verbose: bool,
197 pub sandbox: bool,
198 pub debug_file: Option<String>,
199 pub permission_mode: Option<String>,
200 pub on_debug: Arc<dyn Fn(String) + Send + Sync>,
201 pub on_activity: Option<Arc<dyn Fn(String, SessionActivity) + Send + Sync>>,
202 pub on_permission_request: Option<Arc<dyn Fn(String, PermissionRequest, String) + Send + Sync>>,
203}
204
205impl Default for SessionSpawnerDeps {
206 fn default() -> Self {
207 Self {
208 exec_path: String::new(),
209 script_args: Vec::new(),
210 env: std::collections::HashMap::new(),
211 verbose: false,
212 sandbox: false,
213 debug_file: None,
214 permission_mode: None,
215 on_debug: Arc::new(|_| {}),
216 on_activity: None,
217 on_permission_request: None,
218 }
219 }
220}
221
222fn tool_verb(name: &str) -> String {
228 let verb = match name {
229 "Read" => "Reading",
230 "Write" => "Writing",
231 "Edit" => "Editing",
232 "MultiEdit" => "Editing",
233 "Bash" => "Running",
234 "Glob" => "Searching",
235 "Grep" => "Searching",
236 "WebFetch" => "Fetching",
237 "WebSearch" => "Searching",
238 "Task" => "Running task",
239 "FileReadTool" => "Reading",
240 "FileWriteTool" => "Writing",
241 "FileEditTool" => "Editing",
242 "GlobTool" => "Searching",
243 "GrepTool" => "Searching",
244 "BashTool" => "Running",
245 "NotebookEditTool" => "Editing notebook",
246 "LSP" => "LSP",
247 _ => name,
248 };
249 verb.to_string()
250}
251
252fn tool_summary(name: &str, input: &serde_json::Value) -> String {
254 let verb = tool_verb(name);
255
256 let target = input
257 .get("file_path")
258 .or_else(|| input.get("filePath"))
259 .or_else(|| input.get("pattern"))
260 .or_else(|| input.get("command"))
261 .or_else(|| input.get("url"))
262 .or_else(|| input.get("query"))
263 .and_then(|v| v.as_str())
264 .map(|s| {
265 if s.len() > 60 {
266 format!("{}...", &s[..60])
267 } else {
268 s.to_string()
269 }
270 });
271
272 match target {
273 Some(t) => format!("{} {}", verb, t),
274 None => verb.to_string(),
275 }
276}
277
278fn input_preview(input: &serde_json::Value) -> String {
280 let mut parts = Vec::new();
281 if let Some(obj) = input.as_object() {
282 for (key, val) in obj.iter().take(3) {
283 if let Some(s) = val.as_str() {
284 let truncated = if s.len() > 100 {
285 format!("{}...", &s[..100])
286 } else {
287 s.to_string()
288 };
289 parts.push(format!("{}=\"{}\"", key, truncated));
290 }
291 }
292 }
293 parts.join(" ")
294}
295
296fn extract_activities(
298 line: &str,
299 session_id: &str,
300 on_debug: &Arc<dyn Fn(String) + Send + Sync>,
301) -> Vec<SessionActivity> {
302 let parsed: serde_json::Value = match serde_json::from_str(line) {
303 Ok(v) => v,
304 Err(_) => return Vec::new(),
305 };
306
307 let obj = match parsed.as_object() {
308 Some(o) => o,
309 None => return Vec::new(),
310 };
311
312 let mut activities = Vec::new();
313 let now = std::time::SystemTime::now()
314 .duration_since(std::time::UNIX_EPOCH)
315 .map(|d| d.as_millis() as i64)
316 .unwrap_or(0);
317
318 if let Some(msg_type) = obj.get("type").and_then(|v| v.as_str()) {
320 if msg_type == "assistant" {
321 if let Some(message) = obj.get("message").and_then(|v| v.as_object()) {
322 if let Some(content) = message.get("content").and_then(|v| v.as_array()) {
323 for block in content {
324 let block_obj = match block.as_object() {
325 Some(o) => o,
326 None => continue,
327 };
328
329 let block_type = match block_obj.get("type").and_then(|v| v.as_str()) {
330 Some(t) => t,
331 None => continue,
332 };
333
334 if block_type == "tool_use" {
335 let name = block_obj
336 .get("name")
337 .and_then(|v| v.as_str())
338 .unwrap_or("Tool");
339 let input = block_obj.get("input").unwrap_or(&serde_json::Value::Null);
340 let summary = tool_summary(name, input);
341
342 on_debug(format!(
343 "[bridge:activity] sessionId={} tool_use name={} {}",
344 session_id,
345 name,
346 input_preview(input)
347 ));
348
349 activities.push(SessionActivity {
350 activity_type: SessionActivityType::ToolStart,
351 summary,
352 timestamp: now,
353 });
354 } else if block_type == "text" {
355 if let Some(text) = block_obj.get("text").and_then(|v| v.as_str()) {
356 if !text.is_empty() {
357 let summary = if text.len() > 80 {
358 format!("{}...", &text[..80])
359 } else {
360 text.to_string()
361 };
362
363 on_debug(format!(
364 "[bridge:activity] sessionId={} text \"{}\"",
365 session_id,
366 if text.len() > 100 {
367 format!("{}...", &text[..100])
368 } else {
369 text.to_string()
370 }
371 ));
372
373 activities.push(SessionActivity {
374 activity_type: SessionActivityType::Text,
375 summary,
376 timestamp: now,
377 });
378 }
379 }
380 }
381 }
382 }
383 }
384 } else if msg_type == "result" {
385 let subtype = obj.get("subtype").and_then(|v| v.as_str());
386
387 if subtype == Some("success") {
388 on_debug(format!(
389 "[bridge:activity] sessionId={} result subtype=success",
390 session_id
391 ));
392
393 activities.push(SessionActivity {
394 activity_type: SessionActivityType::Result,
395 summary: "Session completed".to_string(),
396 timestamp: now,
397 });
398 } else if let Some(sub) = subtype {
399 let errors = obj.get("errors").and_then(|v| v.as_array());
400 let error_summary = errors
401 .and_then(|arr| arr.first())
402 .and_then(|v| v.as_str())
403 .map(|s| s.to_string())
404 .unwrap_or_else(|| format!("Error: {}", sub));
405
406 on_debug(format!(
407 "[bridge:activity] sessionId={} result subtype={} error=\"{}\"",
408 session_id, sub, error_summary
409 ));
410
411 activities.push(SessionActivity {
412 activity_type: SessionActivityType::Error,
413 summary: error_summary,
414 timestamp: now,
415 });
416 } else {
417 on_debug(format!(
418 "[bridge:activity] sessionId={} result subtype=undefined",
419 session_id
420 ));
421 }
422 }
423 }
424
425 activities
426}
427
428fn extract_user_message_text(msg: &serde_json::Value) -> Option<String> {
430 let obj = msg.as_object()?;
431
432 if obj.get("parent_tool_use_id").is_some()
434 || obj
435 .get("isSynthetic")
436 .and_then(|v| v.as_bool())
437 .unwrap_or(false)
438 || obj
439 .get("isReplay")
440 .and_then(|v| v.as_bool())
441 .unwrap_or(false)
442 {
443 return None;
444 }
445
446 let message = obj.get("message")?.as_object()?;
447 let content = message.get("content")?;
448
449 let text = if let Some(s) = content.as_str() {
450 Some(s.to_string())
451 } else if let Some(arr) = content.as_array() {
452 for block in arr {
453 if let Some(block_obj) = block.as_object() {
454 if block_obj.get("type").and_then(|v| v.as_str()) == Some("text") {
455 if let Some(text) = block_obj.get("text").and_then(|v| v.as_str()) {
456 return Some(text.trim().to_string());
457 }
458 }
459 }
460 }
461 None
462 } else {
463 None
464 };
465
466 text.filter(|s| !s.is_empty())
467}
468
469pub fn create_session_spawner(
475 deps: SessionSpawnerDeps,
476) -> impl Fn(SessionSpawnOpts, &str) -> SessionHandle {
477 move |opts: SessionSpawnOpts, dir: &str| {
478 let on_debug = &deps.on_debug;
479
480 let mut args = deps.script_args.clone();
482 args.push("--print".to_string());
483 args.push("--sdk-url".to_string());
484 args.push(opts.sdk_url.clone());
485 args.push("--session-id".to_string());
486 args.push(opts.session_id.clone());
487 args.push("--input-format".to_string());
488 args.push("stream-json".to_string());
489 args.push("--output-format".to_string());
490 args.push("stream-json".to_string());
491 args.push("--replay-user-messages".to_string());
492
493 if deps.verbose {
494 args.push("--verbose".to_string());
495 }
496
497 if let Some(ref debug_file) = deps.debug_file {
498 args.push("--debug-file".to_string());
499 args.push(debug_file.clone());
500 }
501
502 if let Some(ref permission_mode) = deps.permission_mode {
503 args.push("--permission-mode".to_string());
504 args.push(permission_mode.clone());
505 }
506
507 let mut env = deps.env.clone();
509 env.remove("AI_CODE_OAUTH_TOKEN");
510 env.insert("AI_CODE_ENVIRONMENT_KIND".to_string(), "bridge".to_string());
511
512 if deps.sandbox {
513 env.insert("AI_CODE_FORCE_SANDBOX".to_string(), "1".to_string());
514 }
515
516 env.insert(
517 "AI_CODE_SESSION_ACCESS_TOKEN".to_string(),
518 opts.access_token.clone(),
519 );
520
521 env.insert(
523 "AI_CODE_POST_FOR_SESSION_INGRESS_V2".to_string(),
524 "1".to_string(),
525 );
526
527 if opts.use_ccr_v2 {
529 env.insert("AI_CODE_USE_CCR_V2".to_string(), "1".to_string());
530 if let Some(epoch) = opts.worker_epoch {
531 env.insert("AI_CODE_WORKER_EPOCH".to_string(), epoch.to_string());
532 }
533 }
534
535 on_debug(format!(
536 "[bridge:session] Spawning sessionId={} sdkUrl={} accessToken={}",
537 opts.session_id,
538 opts.sdk_url,
539 if opts.access_token.is_empty() {
540 "MISSING"
541 } else {
542 "present"
543 }
544 ));
545 on_debug(format!("[bridge:session] Child args: {:?}", args));
546
547 let mut child = Command::new(&deps.exec_path);
549 child.args(&args);
550 child.current_dir(dir);
551 child.envs(&env);
552 child.stdin(Stdio::piped());
553 child.stdout(Stdio::piped());
554 child.stderr(Stdio::piped());
555
556 #[cfg(windows)]
557 child.windows_hide(true);
558
559 let mut child = match child.spawn() {
560 Ok(c) => c,
561 Err(e) => {
562 on_debug(format!(
563 "[bridge:session] sessionId={} spawn error: {}",
564 opts.session_id, e
565 ));
566 return SessionHandle {
568 session_id: opts.session_id,
569 done: Arc::new(Mutex::new(Some(SessionDoneStatus::Failed))),
570 activities: Arc::new(Mutex::new(VecDeque::new())),
571 current_activity: Arc::new(Mutex::new(None)),
572 access_token: Arc::new(Mutex::new(opts.access_token)),
573 last_stderr: Arc::new(Mutex::new(VecDeque::new())),
574 child: Arc::new(Mutex::new(None)),
575 stdin: Arc::new(Mutex::new(None)),
576 killed: Arc::new(Mutex::new(true)),
577 sigkill_sent: Arc::new(Mutex::new(true)),
578 };
579 }
580 };
581
582 let pid = child.id();
583 on_debug(format!(
584 "[bridge:session] sessionId={} pid={}",
585 opts.session_id, pid
586 ));
587
588 let stdin = child.stdin.take();
590
591 let activities: Arc<Mutex<VecDeque<SessionActivity>>> =
593 Arc::new(Mutex::new(VecDeque::with_capacity(MAX_ACTIVITIES)));
594 let current_activity: Arc<Mutex<Option<SessionActivity>>> = Arc::new(Mutex::new(None));
595 let last_stderr: Arc<Mutex<VecDeque<String>>> =
596 Arc::new(Mutex::new(VecDeque::with_capacity(MAX_STDERR_LINES)));
597 let done_status: Arc<Mutex<Option<SessionDoneStatus>>> = Arc::new(Mutex::new(None));
598
599 let session_id = opts.session_id.clone();
600 let on_activity = deps.on_activity.clone();
601 let on_permission_request = deps.on_permission_request.clone();
602 let verbose = deps.verbose;
603
604 if let Some(stdout) = child.stdout.take() {
606 let activities_clone = activities.clone();
607 let current_activity_clone = current_activity.clone();
608 let session_id_clone = session_id.clone();
609 let on_debug_clone = on_debug.clone();
610 let on_activity_clone = on_activity.clone();
611 let opts_clone = opts.clone();
612
613 thread::spawn(move || {
614 let reader = BufReader::new(stdout);
615 for line in reader.lines().map_while(Result::ok) {
616 on_debug_clone(format!(
618 "[bridge:ws] sessionId={} <<< {}",
619 session_id_clone,
620 if line.len() > 200 {
621 format!("{}...", &line[..200])
622 } else {
623 line.clone()
624 }
625 ));
626
627 if verbose {
629 eprintln!("{}", line);
630 }
631
632 let extracted = extract_activities(&line, &session_id_clone, &on_debug_clone);
634 for activity in extracted {
635 if let Ok(mut acts) = activities_clone.lock() {
636 if acts.len() >= MAX_ACTIVITIES {
637 acts.pop_front();
638 }
639 acts.push_back(activity.clone());
640
641 if let Ok(mut current) = current_activity_clone.lock() {
642 *current = Some(activity.clone());
643 }
644
645 if let Some(ref callback) = on_activity_clone {
646 callback(session_id_clone.clone(), activity);
647 }
648 }
649 }
650
651 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&line) {
653 if let Some(obj) = parsed.as_object() {
654 if obj.get("type").and_then(|v| v.as_str()) == Some("control_request") {
655 if let Some(request) =
656 obj.get("request").and_then(|v| v.as_object())
657 {
658 if request.get("subtype").and_then(|v| v.as_str())
659 == Some("can_use_tool")
660 {
661 if let Some(ref callback) = on_permission_request {
662 let perm_request = PermissionRequest {
663 request_id: obj
664 .get("request_id")
665 .and_then(|v| v.as_str())
666 .unwrap_or("")
667 .to_string(),
668 request: PermissionRequestInner {
669 subtype: request
670 .get("subtype")
671 .and_then(|v| v.as_str())
672 .unwrap_or("")
673 .to_string(),
674 tool_name: request
675 .get("tool_name")
676 .and_then(|v| v.as_str())
677 .unwrap_or("")
678 .to_string(),
679 input: request
680 .get("input")
681 .cloned()
682 .unwrap_or(serde_json::Value::Null),
683 tool_use_id: request
684 .get("tool_use_id")
685 .and_then(|v| v.as_str())
686 .unwrap_or("")
687 .to_string(),
688 },
689 };
690 callback(
691 opts_clone.session_id.clone(),
692 perm_request,
693 opts_clone.access_token.clone(),
694 );
695 }
696 }
697 }
698 } else if obj.get("type").and_then(|v| v.as_str()) == Some("user") {
699 if let Some(text) = extract_user_message_text(&parsed) {
700 if let Some(ref callback) = opts_clone.on_first_user_message {
701 callback(text);
702 }
703 }
704 }
705 }
706 }
707 }
708 });
709 }
710
711 if let Some(stderr) = child.stderr.take() {
713 let last_stderr_clone = last_stderr.clone();
714 let on_debug_clone = on_debug.clone();
715
716 thread::spawn(move || {
717 let reader = BufReader::new(stderr);
718 for line in reader.lines().map_while(Result::ok) {
719 if verbose {
721 eprintln!("{}", line);
722 }
723
724 if let Ok(mut stderr_lines) = last_stderr_clone.lock() {
726 if stderr_lines.len() >= MAX_STDERR_LINES {
727 stderr_lines.pop_front();
728 }
729 stderr_lines.push_back(line.clone());
730 }
731
732 on_debug_clone(line);
733 }
734 });
735 }
736
737 let session_id_clone = session_id.clone();
739 let on_debug_clone = on_debug.clone();
740 let done_status_clone = done_status.clone();
741 let child_for_handle = Arc::new(Mutex::new(Some(child)));
742 let child_for_thread = child_for_handle.clone();
743
744 thread::spawn(move || {
745 let mut child_guard = child_for_thread.lock().unwrap();
746 if let Some(ref mut child) = *child_guard {
747 let status = child.wait();
748 let on_debug = on_debug_clone;
749
750 match status {
751 Ok(exit_status) => {
752 let code = exit_status.code().unwrap_or(-1);
755 if code == 15 || code == 2 || code == -11 {
756 on_debug(format!(
757 "[bridge:session] sessionId={} interrupted exit_code={} pid={}",
758 session_id_clone,
759 code,
760 child.id()
761 ));
762 if let Ok(mut status) = done_status_clone.lock() {
763 *status = Some(SessionDoneStatus::Interrupted);
764 }
765 } else if exit_status.success() {
766 on_debug(format!(
767 "[bridge:session] sessionId={} completed exit_code=0 pid={}",
768 session_id_clone,
769 child.id()
770 ));
771 if let Ok(mut status) = done_status_clone.lock() {
772 *status = Some(SessionDoneStatus::Completed);
773 }
774 } else {
775 on_debug(format!(
776 "[bridge:session] sessionId={} failed exit_code={:?} pid={}",
777 session_id_clone,
778 exit_status.code(),
779 child.id()
780 ));
781 if let Ok(mut status) = done_status_clone.lock() {
782 *status = Some(SessionDoneStatus::Failed);
783 }
784 }
785 }
786 Err(e) => {
787 on_debug(format!(
788 "[bridge:session] sessionId={} wait error: {}",
789 session_id_clone, e
790 ));
791 if let Ok(mut status) = done_status_clone.lock() {
792 *status = Some(SessionDoneStatus::Failed);
793 }
794 }
795 }
796 }
797 });
798
799 SessionHandle {
800 session_id: opts.session_id,
801 done: done_status,
802 activities,
803 current_activity,
804 access_token: Arc::new(Mutex::new(opts.access_token)),
805 last_stderr,
806 child: child_for_handle,
807 stdin: Arc::new(Mutex::new(stdin)),
808 killed: Arc::new(Mutex::new(false)),
809 sigkill_sent: Arc::new(Mutex::new(false)),
810 }
811 }
812}
813
814pub fn safe_filename_id(id: &str) -> String {
817 id.chars()
818 .map(|c| {
819 if c.is_alphanumeric() || c == '-' || c == '_' {
820 c
821 } else {
822 '_'
823 }
824 })
825 .collect()
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831
832 #[test]
833 fn test_safe_filename_id() {
834 assert_eq!(safe_filename_id("session_abc123"), "session_abc123");
835 assert_eq!(safe_filename_id("cse_abc-123"), "cse_abc-123");
836 assert_eq!(safe_filename_id("../etc/passwd"), "___etc_passwd");
837 }
838
839 #[test]
840 fn test_tool_summary() {
841 let input = serde_json::json!({ "file_path": "/path/to/file.txt" });
842 assert_eq!(tool_summary("Read", &input), "Reading /path/to/file.txt");
843
844 let input2 = serde_json::json!({ "command": "ls -la" });
845 assert_eq!(tool_summary("Bash", &input2), "Running ls -la");
846 }
847
848 #[test]
849 fn test_input_preview() {
850 let input = serde_json::json!({
851 "file_path": "/test.txt",
852 "content": "hello world"
853 });
854 let preview = input_preview(&input);
855 assert!(preview.contains("file_path="));
856 }
857}