1use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use crate::bridge::SDKMessage;
18
19pub fn is_sdk_message(value: &serde_json::Value) -> bool {
27 value.get("type").and_then(|v| v.as_str()).is_some()
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SDKControlResponse {
33 #[serde(rename = "type")]
34 pub response_type: String,
35 pub response: SDKControlResponsePayload,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SDKControlResponsePayload {
40 #[serde(rename = "subtype")]
41 pub response_subtype: String,
42 #[serde(rename = "request_id")]
43 pub request_id: String,
44 pub error: Option<String>,
45 pub response: Option<serde_json::Value>,
46}
47
48impl SDKControlResponse {
49 pub fn new(subtype: &str, request_id: &str) -> Self {
50 Self {
51 response_type: "control_response".to_string(),
52 response: SDKControlResponsePayload {
53 response_subtype: subtype.to_string(),
54 request_id: request_id.to_string(),
55 error: None,
56 response: None,
57 },
58 }
59 }
60
61 pub fn success(request_id: &str) -> Self {
62 Self::new("success", request_id)
63 }
64
65 pub fn error(request_id: &str, error: &str) -> Self {
66 Self {
67 response_type: "control_response".to_string(),
68 response: SDKControlResponsePayload {
69 response_subtype: "error".to_string(),
70 request_id: request_id.to_string(),
71 error: Some(error.to_string()),
72 response: None,
73 },
74 }
75 }
76}
77
78pub fn is_sdk_control_response(value: &serde_json::Value) -> bool {
79 value
80 .get("type")
81 .and_then(|v| v.as_str())
82 .map(|s| s == "control_response")
83 .unwrap_or(false)
84 && value.get("response").is_some()
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum SDKControlRequest {
91 ControlRequest {
92 request_id: String,
93 request: SDKControlRequestPayload,
94 },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SDKControlRequestPayload {
99 #[serde(rename = "subtype")]
100 pub request_subtype: String,
101 pub model: Option<String>,
102 #[serde(rename = "max_thinking_tokens")]
103 pub max_thinking_tokens: Option<u32>,
104 pub mode: Option<String>,
105}
106
107pub fn is_sdk_control_request(value: &serde_json::Value) -> bool {
108 value
109 .get("type")
110 .and_then(|v| v.as_str())
111 .map(|s| s == "control_request")
112 .unwrap_or(false)
113 && value.get("request_id").is_some()
114 && value.get("request").is_some()
115}
116
117#[derive(Debug, Clone)]
123pub enum MessageType {
124 User,
125 Assistant,
126 System,
127 ToolUse,
128 ToolResult,
129}
130
131pub fn is_eligible_bridge_message(
135 msg_type: &MessageType,
136 is_virtual: bool,
137 system_subtype: Option<&str>,
138) -> bool {
139 if matches!(msg_type, MessageType::User | MessageType::Assistant) && is_virtual {
142 return false;
143 }
144 matches!(msg_type, MessageType::User | MessageType::Assistant)
145 || (matches!(msg_type, MessageType::System) && system_subtype == Some("local_command"))
146}
147
148pub fn extract_title_text(
158 msg_type: &MessageType,
159 is_meta: bool,
160 tool_use_result: bool,
161 is_compact_summary: bool,
162 origin_kind: Option<&str>,
163 content: &str,
164) -> Option<String> {
165 if !matches!(msg_type, MessageType::User) || is_meta || tool_use_result || is_compact_summary {
167 return None;
168 }
169
170 if let Some(kind) = origin_kind {
172 if kind != "human" {
173 return None;
174 }
175 }
176
177 if content.is_empty() {
179 return None;
180 }
181
182 let clean = strip_display_tags_allow_empty(content);
184 if clean.is_empty() {
185 None
186 } else {
187 Some(clean)
188 }
189}
190
191fn strip_display_tags_allow_empty(s: &str) -> String {
193 s.to_string()
196}
197
198pub type OnInboundMessage = Arc<dyn Fn(SDKMessage) + Send + Sync>;
204pub type OnPermissionResponse = Arc<dyn Fn(SDKControlResponse) + Send + Sync>;
205pub type OnControlRequest = Arc<dyn Fn(SDKControlRequest) + Send + Sync>;
206
207pub fn handle_ingress_message(
212 data: &str,
213 recent_posted_uuids: &mut BoundedUuidSet,
214 recent_inbound_uuids: &mut BoundedUuidSet,
215 on_inbound_message: Option<&OnInboundMessage>,
216 on_permission_response: Option<&OnPermissionResponse>,
217 on_control_request: Option<&OnControlRequest>,
218 log_for_debugging: &dyn Fn(&str),
219) {
220 let parsed: serde_json::Value = match serde_json::from_str(data) {
222 Ok(v) => v,
223 Err(err) => {
224 log_for_debugging(&format!(
225 "[bridge:repl] Failed to parse ingress message: {}",
226 err
227 ));
228 return;
229 }
230 };
231
232 if is_sdk_control_response(&parsed) {
234 log_for_debugging("[bridge:repl] Ingress message type=control_response");
235 if let Some(callback) = on_permission_response {
236 if let Ok(response) = serde_json::from_value::<SDKControlResponse>(parsed.clone()) {
237 callback(response);
238 }
239 }
240 return;
241 }
242
243 if is_sdk_control_request(&parsed) {
246 let subtype = parsed
247 .get("request")
248 .and_then(|r| r.get("subtype"))
249 .and_then(|v| v.as_str())
250 .unwrap_or("unknown");
251 log_for_debugging(&format!(
252 "[bridge:repl] Inbound control_request subtype={}",
253 subtype
254 ));
255 if let Some(callback) = on_control_request {
256 if let Ok(request) = serde_json::from_value::<SDKControlRequest>(parsed.clone()) {
257 callback(request);
258 }
259 }
260 return;
261 }
262
263 if !is_sdk_message(&parsed) {
264 return;
265 }
266
267 let uuid = parsed.get("uuid").and_then(|v| v.as_str());
269
270 if let Some(uuid_str) = uuid {
271 if recent_posted_uuids.contains(uuid_str) {
272 let msg_type = parsed
273 .get("type")
274 .and_then(|v| v.as_str())
275 .unwrap_or("unknown");
276 log_for_debugging(&format!(
277 "[bridge:repl] Ignoring echo: type={} uuid={}",
278 msg_type, uuid_str
279 ));
280 return;
281 }
282
283 if recent_inbound_uuids.contains(uuid_str) {
285 let msg_type = parsed
286 .get("type")
287 .and_then(|v| v.as_str())
288 .unwrap_or("unknown");
289 log_for_debugging(&format!(
290 "[bridge:repl] Ignoring re-delivered inbound: type={} uuid={}",
291 msg_type, uuid_str
292 ));
293 return;
294 }
295 }
296
297 let msg_type = parsed
298 .get("type")
299 .and_then(|v| v.as_str())
300 .unwrap_or("unknown");
301 let uuid_suffix = uuid.map(|u| format!(" uuid={}", u)).unwrap_or_default();
302 log_for_debugging(&format!(
303 "[bridge:repl] Ingress message type={}{}",
304 msg_type, uuid_suffix
305 ));
306
307 if msg_type == "user" {
308 if let Some(uuid_str) = uuid {
309 recent_inbound_uuids.add(uuid_str.to_string());
310 }
311 if let Some(callback) = on_inbound_message {
313 if let Ok(msg) = serde_json::from_value::<SDKMessage>(parsed.clone()) {
314 callback(msg);
315 }
316 }
317 } else {
318 log_for_debugging(&format!(
319 "[bridge:repl] Ignoring non-user inbound message: type={}",
320 msg_type
321 ));
322 }
323}
324
325pub struct ServerControlRequestHandlers {
331 pub transport: Option<Box<dyn ReplBridgeTransport + Send>>,
332 pub session_id: String,
333 pub outbound_only: bool,
339 pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
340 pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
341 pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
342 pub on_set_permission_mode: Option<Arc<dyn Fn(String) -> Result<(), String> + Send + Sync>>,
343}
344
345pub trait ReplBridgeTransport {
347 fn write(&self, event: serde_json::Value) -> Result<(), String>;
348}
349
350const OUTBOUND_ONLY_ERROR: &str =
351 "This session is outbound-only. Enable Remote Control locally to allow inbound control.";
352
353pub fn handle_server_control_request(
358 request: &SDKControlRequest,
359 handlers: &ServerControlRequestHandlers,
360 log_for_debugging: &dyn Fn(&str),
361) {
362 let ServerControlRequestHandlers {
363 transport,
364 session_id,
365 outbound_only,
366 on_interrupt,
367 on_set_model,
368 on_set_max_thinking_tokens,
369 on_set_permission_mode,
370 } = handlers;
371
372 let Some(transport) = transport else {
373 log_for_debugging(
374 "[bridge:repl] Cannot respond to control_request: transport not configured",
375 );
376 return;
377 };
378
379 let SDKControlRequest::ControlRequest {
380 request_id,
381 request: request_payload,
382 } = request
383 else {
384 return;
385 };
386
387 let request_subtype = &request_payload.request_subtype;
388
389 let response: SDKControlResponse;
390
391 if *outbound_only && request_subtype != "initialize" {
395 response = SDKControlResponse {
396 response_type: "control_response".to_string(),
397 response: SDKControlResponsePayload {
398 response_subtype: "error".to_string(),
399 request_id: request_id.clone(),
400 error: Some(OUTBOUND_ONLY_ERROR.to_string()),
401 response: None,
402 },
403 };
404 let event = serde_json::json!({
405 "type": "control_response",
406 "response": response.response,
407 "session_id": session_id
408 });
409 let _ = transport.write(event);
410 log_for_debugging(&format!(
411 "[bridge:repl] Rejected {} (outbound-only) request_id={}",
412 request_subtype, request_id
413 ));
414 return;
415 }
416
417 match request_subtype.as_str() {
418 "initialize" => {
419 response = SDKControlResponse {
422 response_type: "control_response".to_string(),
423 response: SDKControlResponsePayload {
424 response_subtype: "success".to_string(),
425 request_id: request_id.clone(),
426 error: None,
427 response: Some(serde_json::json!({
428 "commands": [],
429 "output_style": "normal",
430 "available_output_styles": ["normal"],
431 "models": [],
432 "account": {},
433 "pid": std::process::id(),
434 })),
435 },
436 };
437 }
438 "set_model" => {
439 on_set_model
440 .as_ref()
441 .map(|cb| cb(request_payload.model.clone()));
442 response = SDKControlResponse {
443 response_type: "control_response".to_string(),
444 response: SDKControlResponsePayload {
445 response_subtype: "success".to_string(),
446 request_id: request_id.clone(),
447 error: None,
448 response: None,
449 },
450 };
451 }
452 "set_max_thinking_tokens" => {
453 on_set_max_thinking_tokens
454 .as_ref()
455 .map(|cb| cb(request_payload.max_thinking_tokens));
456 response = SDKControlResponse {
457 response_type: "control_response".to_string(),
458 response: SDKControlResponsePayload {
459 response_subtype: "success".to_string(),
460 request_id: request_id.clone(),
461 error: None,
462 response: None,
463 },
464 };
465 }
466 "set_permission_mode" => {
467 let mode = request_payload.mode.clone().unwrap_or_default();
474 let verdict = on_set_permission_mode
475 .as_ref()
476 .map(|cb| cb(mode.clone()))
477 .unwrap_or(Err(
478 "set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)".to_string()
479 ));
480
481 if verdict.is_ok() {
482 response = SDKControlResponse {
483 response_type: "control_response".to_string(),
484 response: SDKControlResponsePayload {
485 response_subtype: "success".to_string(),
486 request_id: request_id.clone(),
487 error: None,
488 response: None,
489 },
490 };
491 } else {
492 response = SDKControlResponse {
493 response_type: "control_response".to_string(),
494 response: SDKControlResponsePayload {
495 response_subtype: "error".to_string(),
496 request_id: request_id.clone(),
497 error: Some(verdict.err().unwrap_or_default()),
498 response: None,
499 },
500 };
501 }
502 }
503 "interrupt" => {
504 on_interrupt.as_ref().map(|cb| cb());
505 response = SDKControlResponse {
506 response_type: "control_response".to_string(),
507 response: SDKControlResponsePayload {
508 response_subtype: "success".to_string(),
509 request_id: request_id.clone(),
510 error: None,
511 response: None,
512 },
513 };
514 }
515 _ => {
516 response = SDKControlResponse {
519 response_type: "control_response".to_string(),
520 response: SDKControlResponsePayload {
521 response_subtype: "error".to_string(),
522 request_id: request_id.clone(),
523 error: Some(format!(
524 "REPL bridge does not handle control_request subtype: {}",
525 request_subtype
526 )),
527 response: None,
528 },
529 };
530 }
531 }
532
533 let event = serde_json::json!({
534 "type": "control_response",
535 "response": response.response,
536 "session_id": session_id
537 });
538 let _ = transport.write(event);
539 log_for_debugging(&format!(
540 "[bridge:repl] Sent control_response for {} request_id={} result={}",
541 request_subtype, request_id, request_payload.request_subtype
542 ));
543}
544
545#[derive(Debug, Clone, Serialize, Deserialize, Default)]
551pub struct EmptyUsage {
552 pub input_tokens: u32,
553 pub output_tokens: u32,
554 #[serde(rename = "cache_creation_input_tokens")]
555 pub cache_creation_input_tokens: u32,
556 #[serde(rename = "cache_hit_input_tokens")]
557 pub cache_hit_input_tokens: u32,
558}
559
560#[derive(Debug, Clone, Serialize, Deserialize)]
563pub struct SDKResultSuccess {
564 #[serde(rename = "type")]
565 pub result_type: String,
566 pub subtype: String,
567 #[serde(rename = "duration_ms")]
568 pub duration_ms: u64,
569 #[serde(rename = "duration_api_ms")]
570 pub duration_api_ms: u64,
571 #[serde(rename = "is_error")]
572 pub is_error: bool,
573 #[serde(rename = "num_turns")]
574 pub num_turns: u32,
575 pub result: String,
576 #[serde(rename = "stop_reason")]
577 pub stop_reason: Option<String>,
578 #[serde(rename = "total_cost_usd")]
579 pub total_cost_usd: f64,
580 pub usage: EmptyUsage,
581 #[serde(rename = "model_usage")]
582 pub model_usage: serde_json::Value,
583 #[serde(rename = "permission_denials")]
584 pub permission_denials: Vec<String>,
585 #[serde(rename = "session_id")]
586 pub session_id: String,
587 pub uuid: String,
588}
589
590pub fn make_result_message(session_id: &str) -> SDKResultSuccess {
591 SDKResultSuccess {
592 result_type: "result".to_string(),
593 subtype: "success".to_string(),
594 duration_ms: 0,
595 duration_api_ms: 0,
596 is_error: false,
597 num_turns: 0,
598 result: String::new(),
599 stop_reason: None,
600 total_cost_usd: 0.0,
601 usage: EmptyUsage {
602 input_tokens: 0,
603 output_tokens: 0,
604 cache_creation_input_tokens: 0,
605 cache_hit_input_tokens: 0,
606 },
607 model_usage: serde_json::json!({}),
608 permission_denials: vec![],
609 session_id: session_id.to_string(),
610 uuid: uuid::Uuid::new_v4().to_string(),
611 }
612}
613
614pub struct BoundedUuidSet {
626 capacity: usize,
627 ring: Vec<Option<String>>,
628 set: HashSet<String>,
629 write_idx: usize,
630}
631
632impl BoundedUuidSet {
633 pub fn new(capacity: usize) -> Self {
634 Self {
635 capacity,
636 ring: vec![None; capacity],
637 set: HashSet::new(),
638 write_idx: 0,
639 }
640 }
641
642 pub fn add(&mut self, uuid: String) {
643 if self.set.contains(&uuid) {
644 return;
645 }
646 if let Some(evicted) = self.ring[self.write_idx].take() {
648 self.set.remove(&evicted);
649 }
650 self.ring[self.write_idx] = Some(uuid.clone());
651 self.set.insert(uuid);
652 self.write_idx = (self.write_idx + 1) % self.capacity;
653 }
654
655 pub fn contains(&self, uuid: &str) -> bool {
656 self.set.contains(uuid)
657 }
658
659 pub fn clear(&mut self) {
660 self.set.clear();
661 for item in &mut self.ring {
662 *item = None;
663 }
664 self.write_idx = 0;
665 }
666}
667
668impl Default for BoundedUuidSet {
669 fn default() -> Self {
670 Self::new(100)
671 }
672}