1use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use crate::bridge::SDKMessage;
18
19pub fn is_sdk_message(value: &serde_json::Value) -> bool {
27 value.get("type").and_then(|v| v.as_str()).is_some()
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SDKControlResponse {
33 #[serde(rename = "type")]
34 pub response_type: String,
35 pub response: SDKControlResponsePayload,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SDKControlResponsePayload {
40 #[serde(rename = "subtype")]
41 pub response_subtype: String,
42 #[serde(rename = "request_id")]
43 pub request_id: String,
44 pub error: Option<String>,
45 pub response: Option<serde_json::Value>,
46}
47
48impl SDKControlResponse {
49 pub fn new(subtype: &str, request_id: &str) -> Self {
50 Self {
51 response_type: "control_response".to_string(),
52 response: SDKControlResponsePayload {
53 response_subtype: subtype.to_string(),
54 request_id: request_id.to_string(),
55 error: None,
56 response: None,
57 },
58 }
59 }
60
61 pub fn success(request_id: &str) -> Self {
62 Self::new("success", request_id)
63 }
64
65 pub fn error(request_id: &str, error: &str) -> Self {
66 Self {
67 response_type: "control_response".to_string(),
68 response: SDKControlResponsePayload {
69 response_subtype: "error".to_string(),
70 request_id: request_id.to_string(),
71 error: Some(error.to_string()),
72 response: None,
73 },
74 }
75 }
76}
77
78pub fn is_sdk_control_response(value: &serde_json::Value) -> bool {
79 value
80 .get("type")
81 .and_then(|v| v.as_str())
82 .map(|s| s == "control_response")
83 .unwrap_or(false)
84 && value.get("response").is_some()
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum SDKControlRequest {
91 ControlRequest {
92 request_id: String,
93 request: SDKControlRequestPayload,
94 },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SDKControlRequestPayload {
99 #[serde(rename = "subtype")]
100 pub request_subtype: String,
101 pub model: Option<String>,
102 #[serde(rename = "max_thinking_tokens")]
103 pub max_thinking_tokens: Option<u32>,
104 pub mode: Option<String>,
105}
106
107pub fn is_sdk_control_request(value: &serde_json::Value) -> bool {
108 value
109 .get("type")
110 .and_then(|v| v.as_str())
111 .map(|s| s == "control_request")
112 .unwrap_or(false)
113 && value.get("request_id").is_some()
114 && value.get("request").is_some()
115}
116
117#[derive(Debug, Clone)]
123pub enum MessageType {
124 User,
125 Assistant,
126 System,
127 ToolUse,
128 ToolResult,
129}
130
131pub fn is_eligible_bridge_message(
135 msg_type: &MessageType,
136 is_virtual: bool,
137 system_subtype: Option<&str>,
138) -> bool {
139 if matches!(msg_type, MessageType::User | MessageType::Assistant) && is_virtual {
142 return false;
143 }
144 matches!(msg_type, MessageType::User | MessageType::Assistant)
145 || (matches!(msg_type, MessageType::System) && system_subtype == Some("local_command"))
146}
147
148pub fn extract_title_text(
158 msg_type: &MessageType,
159 is_meta: bool,
160 tool_use_result: bool,
161 is_compact_summary: bool,
162 origin_kind: Option<&str>,
163 content: &str,
164) -> Option<String> {
165 if !matches!(msg_type, MessageType::User) || is_meta || tool_use_result || is_compact_summary {
167 return None;
168 }
169
170 if let Some(kind) = origin_kind {
172 if kind != "human" {
173 return None;
174 }
175 }
176
177 if content.is_empty() {
179 return None;
180 }
181
182 let clean = strip_display_tags_allow_empty(content);
184 if clean.is_empty() { None } else { Some(clean) }
185}
186
187fn strip_display_tags_allow_empty(s: &str) -> String {
189 s.to_string()
192}
193
194pub type OnInboundMessage = Arc<dyn Fn(SDKMessage) + Send + Sync>;
200pub type OnPermissionResponse = Arc<dyn Fn(SDKControlResponse) + Send + Sync>;
201pub type OnControlRequest = Arc<dyn Fn(SDKControlRequest) + Send + Sync>;
202
203pub fn handle_ingress_message(
208 data: &str,
209 recent_posted_uuids: &mut BoundedUuidSet,
210 recent_inbound_uuids: &mut BoundedUuidSet,
211 on_inbound_message: Option<&OnInboundMessage>,
212 on_permission_response: Option<&OnPermissionResponse>,
213 on_control_request: Option<&OnControlRequest>,
214 log_for_debugging: &dyn Fn(&str),
215) {
216 let parsed: serde_json::Value = match serde_json::from_str(data) {
218 Ok(v) => v,
219 Err(err) => {
220 log_for_debugging(&format!(
221 "[bridge:repl] Failed to parse ingress message: {}",
222 err
223 ));
224 return;
225 }
226 };
227
228 if is_sdk_control_response(&parsed) {
230 log_for_debugging("[bridge:repl] Ingress message type=control_response");
231 if let Some(callback) = on_permission_response {
232 if let Ok(response) = serde_json::from_value::<SDKControlResponse>(parsed.clone()) {
233 callback(response);
234 }
235 }
236 return;
237 }
238
239 if is_sdk_control_request(&parsed) {
242 let subtype = parsed
243 .get("request")
244 .and_then(|r| r.get("subtype"))
245 .and_then(|v| v.as_str())
246 .unwrap_or("unknown");
247 log_for_debugging(&format!(
248 "[bridge:repl] Inbound control_request subtype={}",
249 subtype
250 ));
251 if let Some(callback) = on_control_request {
252 if let Ok(request) = serde_json::from_value::<SDKControlRequest>(parsed.clone()) {
253 callback(request);
254 }
255 }
256 return;
257 }
258
259 if !is_sdk_message(&parsed) {
260 return;
261 }
262
263 let uuid = parsed.get("uuid").and_then(|v| v.as_str());
265
266 if let Some(uuid_str) = uuid {
267 if recent_posted_uuids.contains(uuid_str) {
268 let msg_type = parsed
269 .get("type")
270 .and_then(|v| v.as_str())
271 .unwrap_or("unknown");
272 log_for_debugging(&format!(
273 "[bridge:repl] Ignoring echo: type={} uuid={}",
274 msg_type, uuid_str
275 ));
276 return;
277 }
278
279 if recent_inbound_uuids.contains(uuid_str) {
281 let msg_type = parsed
282 .get("type")
283 .and_then(|v| v.as_str())
284 .unwrap_or("unknown");
285 log_for_debugging(&format!(
286 "[bridge:repl] Ignoring re-delivered inbound: type={} uuid={}",
287 msg_type, uuid_str
288 ));
289 return;
290 }
291 }
292
293 let msg_type = parsed
294 .get("type")
295 .and_then(|v| v.as_str())
296 .unwrap_or("unknown");
297 let uuid_suffix = uuid.map(|u| format!(" uuid={}", u)).unwrap_or_default();
298 log_for_debugging(&format!(
299 "[bridge:repl] Ingress message type={}{}",
300 msg_type, uuid_suffix
301 ));
302
303 if msg_type == "user" {
304 if let Some(uuid_str) = uuid {
305 recent_inbound_uuids.add(uuid_str.to_string());
306 }
307 if let Some(callback) = on_inbound_message {
309 if let Ok(msg) = serde_json::from_value::<SDKMessage>(parsed.clone()) {
310 callback(msg);
311 }
312 }
313 } else {
314 log_for_debugging(&format!(
315 "[bridge:repl] Ignoring non-user inbound message: type={}",
316 msg_type
317 ));
318 }
319}
320
321pub struct ServerControlRequestHandlers {
327 pub transport: Option<Box<dyn ReplBridgeTransport + Send>>,
328 pub session_id: String,
329 pub outbound_only: bool,
335 pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
336 pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
337 pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
338 pub on_set_permission_mode: Option<Arc<dyn Fn(String) -> Result<(), String> + Send + Sync>>,
339}
340
341pub trait ReplBridgeTransport {
343 fn write(&self, event: serde_json::Value) -> Result<(), String>;
344}
345
346const OUTBOUND_ONLY_ERROR: &str =
347 "This session is outbound-only. Enable Remote Control locally to allow inbound control.";
348
349pub fn handle_server_control_request(
354 request: &SDKControlRequest,
355 handlers: &ServerControlRequestHandlers,
356 log_for_debugging: &dyn Fn(&str),
357) {
358 let ServerControlRequestHandlers {
359 transport,
360 session_id,
361 outbound_only,
362 on_interrupt,
363 on_set_model,
364 on_set_max_thinking_tokens,
365 on_set_permission_mode,
366 } = handlers;
367
368 let Some(transport) = transport else {
369 log_for_debugging(
370 "[bridge:repl] Cannot respond to control_request: transport not configured",
371 );
372 return;
373 };
374
375 let SDKControlRequest::ControlRequest {
376 request_id,
377 request: request_payload,
378 } = request
379 else {
380 return;
381 };
382
383 let request_subtype = &request_payload.request_subtype;
384
385 let response: SDKControlResponse;
386
387 if *outbound_only && request_subtype != "initialize" {
391 response = SDKControlResponse {
392 response_type: "control_response".to_string(),
393 response: SDKControlResponsePayload {
394 response_subtype: "error".to_string(),
395 request_id: request_id.clone(),
396 error: Some(OUTBOUND_ONLY_ERROR.to_string()),
397 response: None,
398 },
399 };
400 let event = serde_json::json!({
401 "type": "control_response",
402 "response": response.response,
403 "session_id": session_id
404 });
405 let _ = transport.write(event);
406 log_for_debugging(&format!(
407 "[bridge:repl] Rejected {} (outbound-only) request_id={}",
408 request_subtype, request_id
409 ));
410 return;
411 }
412
413 match request_subtype.as_str() {
414 "initialize" => {
415 response = SDKControlResponse {
418 response_type: "control_response".to_string(),
419 response: SDKControlResponsePayload {
420 response_subtype: "success".to_string(),
421 request_id: request_id.clone(),
422 error: None,
423 response: Some(serde_json::json!({
424 "commands": [],
425 "output_style": "normal",
426 "available_output_styles": ["normal"],
427 "models": [],
428 "account": {},
429 "pid": std::process::id(),
430 })),
431 },
432 };
433 }
434 "set_model" => {
435 on_set_model
436 .as_ref()
437 .map(|cb| cb(request_payload.model.clone()));
438 response = SDKControlResponse {
439 response_type: "control_response".to_string(),
440 response: SDKControlResponsePayload {
441 response_subtype: "success".to_string(),
442 request_id: request_id.clone(),
443 error: None,
444 response: None,
445 },
446 };
447 }
448 "set_max_thinking_tokens" => {
449 on_set_max_thinking_tokens
450 .as_ref()
451 .map(|cb| cb(request_payload.max_thinking_tokens));
452 response = SDKControlResponse {
453 response_type: "control_response".to_string(),
454 response: SDKControlResponsePayload {
455 response_subtype: "success".to_string(),
456 request_id: request_id.clone(),
457 error: None,
458 response: None,
459 },
460 };
461 }
462 "set_permission_mode" => {
463 let mode = request_payload.mode.clone().unwrap_or_default();
470 let verdict = on_set_permission_mode
471 .as_ref()
472 .map(|cb| cb(mode.clone()))
473 .unwrap_or(Err(
474 "set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)".to_string()
475 ));
476
477 if verdict.is_ok() {
478 response = SDKControlResponse {
479 response_type: "control_response".to_string(),
480 response: SDKControlResponsePayload {
481 response_subtype: "success".to_string(),
482 request_id: request_id.clone(),
483 error: None,
484 response: None,
485 },
486 };
487 } else {
488 response = SDKControlResponse {
489 response_type: "control_response".to_string(),
490 response: SDKControlResponsePayload {
491 response_subtype: "error".to_string(),
492 request_id: request_id.clone(),
493 error: Some(verdict.err().unwrap_or_default()),
494 response: None,
495 },
496 };
497 }
498 }
499 "interrupt" => {
500 on_interrupt.as_ref().map(|cb| cb());
501 response = SDKControlResponse {
502 response_type: "control_response".to_string(),
503 response: SDKControlResponsePayload {
504 response_subtype: "success".to_string(),
505 request_id: request_id.clone(),
506 error: None,
507 response: None,
508 },
509 };
510 }
511 _ => {
512 response = SDKControlResponse {
515 response_type: "control_response".to_string(),
516 response: SDKControlResponsePayload {
517 response_subtype: "error".to_string(),
518 request_id: request_id.clone(),
519 error: Some(format!(
520 "REPL bridge does not handle control_request subtype: {}",
521 request_subtype
522 )),
523 response: None,
524 },
525 };
526 }
527 }
528
529 let event = serde_json::json!({
530 "type": "control_response",
531 "response": response.response,
532 "session_id": session_id
533 });
534 let _ = transport.write(event);
535 log_for_debugging(&format!(
536 "[bridge:repl] Sent control_response for {} request_id={} result={}",
537 request_subtype, request_id, request_payload.request_subtype
538 ));
539}
540
541#[derive(Debug, Clone, Serialize, Deserialize, Default)]
547pub struct EmptyUsage {
548 pub input_tokens: u32,
549 pub output_tokens: u32,
550 #[serde(rename = "cache_creation_input_tokens")]
551 pub cache_creation_input_tokens: u32,
552 #[serde(rename = "cache_hit_input_tokens")]
553 pub cache_hit_input_tokens: u32,
554}
555
556#[derive(Debug, Clone, Serialize, Deserialize)]
559pub struct SDKResultSuccess {
560 #[serde(rename = "type")]
561 pub result_type: String,
562 pub subtype: String,
563 #[serde(rename = "duration_ms")]
564 pub duration_ms: u64,
565 #[serde(rename = "duration_api_ms")]
566 pub duration_api_ms: u64,
567 #[serde(rename = "is_error")]
568 pub is_error: bool,
569 #[serde(rename = "num_turns")]
570 pub num_turns: u32,
571 pub result: String,
572 #[serde(rename = "stop_reason")]
573 pub stop_reason: Option<String>,
574 #[serde(rename = "total_cost_usd")]
575 pub total_cost_usd: f64,
576 pub usage: EmptyUsage,
577 #[serde(rename = "model_usage")]
578 pub model_usage: serde_json::Value,
579 #[serde(rename = "permission_denials")]
580 pub permission_denials: Vec<String>,
581 #[serde(rename = "session_id")]
582 pub session_id: String,
583 pub uuid: String,
584}
585
586pub fn make_result_message(session_id: &str) -> SDKResultSuccess {
587 SDKResultSuccess {
588 result_type: "result".to_string(),
589 subtype: "success".to_string(),
590 duration_ms: 0,
591 duration_api_ms: 0,
592 is_error: false,
593 num_turns: 0,
594 result: String::new(),
595 stop_reason: None,
596 total_cost_usd: 0.0,
597 usage: EmptyUsage {
598 input_tokens: 0,
599 output_tokens: 0,
600 cache_creation_input_tokens: 0,
601 cache_hit_input_tokens: 0,
602 },
603 model_usage: serde_json::json!({}),
604 permission_denials: vec![],
605 session_id: session_id.to_string(),
606 uuid: uuid::Uuid::new_v4().to_string(),
607 }
608}
609
610pub struct BoundedUuidSet {
622 capacity: usize,
623 ring: Vec<Option<String>>,
624 set: HashSet<String>,
625 write_idx: usize,
626}
627
628impl BoundedUuidSet {
629 pub fn new(capacity: usize) -> Self {
630 Self {
631 capacity,
632 ring: vec![None; capacity],
633 set: HashSet::new(),
634 write_idx: 0,
635 }
636 }
637
638 pub fn add(&mut self, uuid: String) {
639 if self.set.contains(&uuid) {
640 return;
641 }
642 if let Some(evicted) = self.ring[self.write_idx].take() {
644 self.set.remove(&evicted);
645 }
646 self.ring[self.write_idx] = Some(uuid.clone());
647 self.set.insert(uuid);
648 self.write_idx = (self.write_idx + 1) % self.capacity;
649 }
650
651 pub fn contains(&self, uuid: &str) -> bool {
652 self.set.contains(uuid)
653 }
654
655 pub fn clear(&mut self) {
656 self.set.clear();
657 for item in &mut self.ring {
658 *item = None;
659 }
660 self.write_idx = 0;
661 }
662}
663
664impl Default for BoundedUuidSet {
665 fn default() -> Self {
666 Self::new(100)
667 }
668}