Skip to main content

ai_agent/bridge/
bridge_messaging.rs

1//! Shared transport-layer helpers for bridge message handling.
2//!
3//! Translated from openclaudecode/src/bridge/bridgeMain.ts
4//!
5//! Extracted from replBridge.ts so both the env-based core (initBridgeCore)
6//! and the env-less core (initEnvLessBridgeCore) can use the same ingress
7//! parsing, control-request handling, and echo-dedup machinery.
8//!
9//! Everything here is pure — no closure over bridge-specific state. All
10//! collaborators (transport, sessionId, UUID sets, callbacks) are passed
11//! as params.
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use crate::bridge::SDKMessage;
18
19// =============================================================================
20// TYPE GUARDS
21// =============================================================================
22
23/// Type predicate for parsed WebSocket messages. SDKMessage is a
24/// discriminated union on `type` — validating the discriminant is
25/// sufficient for the predicate; callers narrow further via the union.
26pub fn is_sdk_message(value: &serde_json::Value) -> bool {
27    value.get("type").and_then(|v| v.as_str()).is_some()
28}
29
30/// Type predicate for control_response messages from the server.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SDKControlResponse {
33    #[serde(rename = "type")]
34    pub response_type: String,
35    pub response: SDKControlResponsePayload,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SDKControlResponsePayload {
40    #[serde(rename = "subtype")]
41    pub response_subtype: String,
42    #[serde(rename = "request_id")]
43    pub request_id: String,
44    pub error: Option<String>,
45    pub response: Option<serde_json::Value>,
46}
47
48impl SDKControlResponse {
49    pub fn new(subtype: &str, request_id: &str) -> Self {
50        Self {
51            response_type: "control_response".to_string(),
52            response: SDKControlResponsePayload {
53                response_subtype: subtype.to_string(),
54                request_id: request_id.to_string(),
55                error: None,
56                response: None,
57            },
58        }
59    }
60
61    pub fn success(request_id: &str) -> Self {
62        Self::new("success", request_id)
63    }
64
65    pub fn error(request_id: &str, error: &str) -> Self {
66        Self {
67            response_type: "control_response".to_string(),
68            response: SDKControlResponsePayload {
69                response_subtype: "error".to_string(),
70                request_id: request_id.to_string(),
71                error: Some(error.to_string()),
72                response: None,
73            },
74        }
75    }
76}
77
78pub fn is_sdk_control_response(value: &serde_json::Value) -> bool {
79    value
80        .get("type")
81        .and_then(|v| v.as_str())
82        .map(|s| s == "control_response")
83        .unwrap_or(false)
84        && value.get("response").is_some()
85}
86
87/// Type predicate for control_request messages from the server.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum SDKControlRequest {
91    ControlRequest {
92        request_id: String,
93        request: SDKControlRequestPayload,
94    },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SDKControlRequestPayload {
99    #[serde(rename = "subtype")]
100    pub request_subtype: String,
101    pub model: Option<String>,
102    #[serde(rename = "max_thinking_tokens")]
103    pub max_thinking_tokens: Option<u32>,
104    pub mode: Option<String>,
105}
106
107pub fn is_sdk_control_request(value: &serde_json::Value) -> bool {
108    value
109        .get("type")
110        .and_then(|v| v.as_str())
111        .map(|s| s == "control_request")
112        .unwrap_or(false)
113        && value.get("request_id").is_some()
114        && value.get("request").is_some()
115}
116
117// =============================================================================
118// MESSAGE ELIGIBILITY
119// =============================================================================
120
121/// Message type for internal representation
122#[derive(Debug, Clone)]
123pub enum MessageType {
124    User,
125    Assistant,
126    System,
127    ToolUse,
128    ToolResult,
129}
130
131/// Check if a message type should be forwarded to the bridge transport.
132/// The server only wants user/assistant turns and slash-command system events;
133/// everything else (tool_result, progress, etc.) is internal REPL chatter.
134pub fn is_eligible_bridge_message(
135    msg_type: &MessageType,
136    is_virtual: bool,
137    system_subtype: Option<&str>,
138) -> bool {
139    // Virtual messages (REPL inner calls) are display-only — bridge/SDK
140    // consumers see the REPL tool_use/result which summarizes the work.
141    if matches!(msg_type, MessageType::User | MessageType::Assistant) && is_virtual {
142        return false;
143    }
144    matches!(msg_type, MessageType::User | MessageType::Assistant)
145        || (matches!(msg_type, MessageType::System) && system_subtype == Some("local_command"))
146}
147
148// =============================================================================
149// TITLE TEXT EXTRACTION
150// =============================================================================
151
152/// Extract title-worthy text from a Message for onUserMessage. Returns
153/// None for messages that shouldn't title the session: non-user, meta
154/// (nudges), tool results, compact summaries, non-human origins (task
155/// notifications, channel messages), or pure display-tag content
156/// (<ide_opened_file>, <session-start-hook>, etc.).
157pub fn extract_title_text(
158    msg_type: &MessageType,
159    is_meta: bool,
160    tool_use_result: bool,
161    is_compact_summary: bool,
162    origin_kind: Option<&str>,
163    content: &str,
164) -> Option<String> {
165    // Filter out non-user, meta, tool results, compact summaries
166    if !matches!(msg_type, MessageType::User) || is_meta || tool_use_result || is_compact_summary {
167        return None;
168    }
169
170    // Filter out non-human origins
171    if let Some(kind) = origin_kind {
172        if kind != "human" {
173            return None;
174        }
175    }
176
177    // Extract text content
178    if content.is_empty() {
179        return None;
180    }
181
182    // Strip display tags (simplified - would need full implementation)
183    let clean = strip_display_tags_allow_empty(content);
184    if clean.is_empty() { None } else { Some(clean) }
185}
186
187/// Strip display tags from text (simplified implementation).
188fn strip_display_tags_allow_empty(s: &str) -> String {
189    // Simplified: just return the input for now
190    // Full implementation would strip <ide_opened_file>, <session-start-hook>, etc.
191    s.to_string()
192}
193
194// =============================================================================
195// INGRESS ROUTING
196// =============================================================================
197
198/// Ingress message handler callback types
199pub type OnInboundMessage = Arc<dyn Fn(SDKMessage) + Send + Sync>;
200pub type OnPermissionResponse = Arc<dyn Fn(SDKControlResponse) + Send + Sync>;
201pub type OnControlRequest = Arc<dyn Fn(SDKControlRequest) + Send + Sync>;
202
203/// Parse an ingress WebSocket message and route it to the appropriate handler.
204/// Ignores messages whose UUID is in recentPostedUUIDs (echoes of what we sent)
205/// or in recentInboundUUIDs (re-deliveries we've already forwarded — e.g.
206/// server replayed history after a transport swap lost the seq-num cursor).
207pub fn handle_ingress_message(
208    data: &str,
209    recent_posted_uuids: &mut BoundedUuidSet,
210    recent_inbound_uuids: &mut BoundedUuidSet,
211    on_inbound_message: Option<&OnInboundMessage>,
212    on_permission_response: Option<&OnPermissionResponse>,
213    on_control_request: Option<&OnControlRequest>,
214    log_for_debugging: &dyn Fn(&str),
215) {
216    // Parse the JSON data
217    let parsed: serde_json::Value = match serde_json::from_str(data) {
218        Ok(v) => v,
219        Err(err) => {
220            log_for_debugging(&format!(
221                "[bridge:repl] Failed to parse ingress message: {}",
222                err
223            ));
224            return;
225        }
226    };
227
228    // control_response is not an SDKMessage — check before the type guard
229    if is_sdk_control_response(&parsed) {
230        log_for_debugging("[bridge:repl] Ingress message type=control_response");
231        if let Some(callback) = on_permission_response {
232            if let Ok(response) = serde_json::from_value::<SDKControlResponse>(parsed.clone()) {
233                callback(response);
234            }
235        }
236        return;
237    }
238
239    // control_request from the server (initialize, set_model, can_use_tool).
240    // Must respond promptly or the server kills the WS (~10-14s timeout).
241    if is_sdk_control_request(&parsed) {
242        let subtype = parsed
243            .get("request")
244            .and_then(|r| r.get("subtype"))
245            .and_then(|v| v.as_str())
246            .unwrap_or("unknown");
247        log_for_debugging(&format!(
248            "[bridge:repl] Inbound control_request subtype={}",
249            subtype
250        ));
251        if let Some(callback) = on_control_request {
252            if let Ok(request) = serde_json::from_value::<SDKControlRequest>(parsed.clone()) {
253                callback(request);
254            }
255        }
256        return;
257    }
258
259    if !is_sdk_message(&parsed) {
260        return;
261    }
262
263    // Check for UUID to detect echoes of our own messages
264    let uuid = parsed.get("uuid").and_then(|v| v.as_str());
265
266    if let Some(uuid_str) = uuid {
267        if recent_posted_uuids.contains(uuid_str) {
268            let msg_type = parsed
269                .get("type")
270                .and_then(|v| v.as_str())
271                .unwrap_or("unknown");
272            log_for_debugging(&format!(
273                "[bridge:repl] Ignoring echo: type={} uuid={}",
274                msg_type, uuid_str
275            ));
276            return;
277        }
278
279        // Defensive dedup: drop inbound prompts we've already forwarded.
280        if recent_inbound_uuids.contains(uuid_str) {
281            let msg_type = parsed
282                .get("type")
283                .and_then(|v| v.as_str())
284                .unwrap_or("unknown");
285            log_for_debugging(&format!(
286                "[bridge:repl] Ignoring re-delivered inbound: type={} uuid={}",
287                msg_type, uuid_str
288            ));
289            return;
290        }
291    }
292
293    let msg_type = parsed
294        .get("type")
295        .and_then(|v| v.as_str())
296        .unwrap_or("unknown");
297    let uuid_suffix = uuid.map(|u| format!(" uuid={}", u)).unwrap_or_default();
298    log_for_debugging(&format!(
299        "[bridge:repl] Ingress message type={}{}",
300        msg_type, uuid_suffix
301    ));
302
303    if msg_type == "user" {
304        if let Some(uuid_str) = uuid {
305            recent_inbound_uuids.add(uuid_str.to_string());
306        }
307        // Fire-and-forget — handler may be async (attachment resolution).
308        if let Some(callback) = on_inbound_message {
309            if let Ok(msg) = serde_json::from_value::<SDKMessage>(parsed.clone()) {
310                callback(msg);
311            }
312        }
313    } else {
314        log_for_debugging(&format!(
315            "[bridge:repl] Ignoring non-user inbound message: type={}",
316            msg_type
317        ));
318    }
319}
320
321// =============================================================================
322// SERVER-INITIATED CONTROL REQUESTS
323// =============================================================================
324
325/// Server control request handlers
326pub struct ServerControlRequestHandlers {
327    pub transport: Option<Box<dyn ReplBridgeTransport + Send>>,
328    pub session_id: String,
329    /// When true, all mutable requests (interrupt, set_model, set_permission_mode,
330    /// set_max_thinking_tokens) reply with an error instead of false-success.
331    /// initialize still replies success — the server kills the connection otherwise.
332    /// Used by the outbound-only bridge mode and the SDK's /bridge subpath so claude.ai sees a
333    /// proper error instead of "action succeeded but nothing happened locally".
334    pub outbound_only: bool,
335    pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
336    pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
337    pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
338    pub on_set_permission_mode: Option<Arc<dyn Fn(String) -> Result<(), String> + Send + Sync>>,
339}
340
341/// Trait for bridge transport
342pub trait ReplBridgeTransport {
343    fn write(&self, event: serde_json::Value) -> Result<(), String>;
344}
345
346const OUTBOUND_ONLY_ERROR: &str =
347    "This session is outbound-only. Enable Remote Control locally to allow inbound control.";
348
349/// Respond to inbound control_request messages from the server. The server
350/// sends these for session lifecycle events (initialize, set_model) and
351/// for turn-level coordination (interrupt, set_max_thinking_tokens). If we
352/// don't respond, the server hangs and kills the WS after ~10-14s.
353pub fn handle_server_control_request(
354    request: &SDKControlRequest,
355    handlers: &ServerControlRequestHandlers,
356    log_for_debugging: &dyn Fn(&str),
357) {
358    let ServerControlRequestHandlers {
359        transport,
360        session_id,
361        outbound_only,
362        on_interrupt,
363        on_set_model,
364        on_set_max_thinking_tokens,
365        on_set_permission_mode,
366    } = handlers;
367
368    let Some(transport) = transport else {
369        log_for_debugging(
370            "[bridge:repl] Cannot respond to control_request: transport not configured",
371        );
372        return;
373    };
374
375    let SDKControlRequest::ControlRequest {
376        request_id,
377        request: request_payload,
378    } = request
379    else {
380        return;
381    };
382
383    let request_subtype = &request_payload.request_subtype;
384
385    let response: SDKControlResponse;
386
387    // Outbound-only: reply error for mutable requests so claude.ai doesn't show
388    // false success. initialize must still succeed (server kills the connection
389    // if it doesn't — see comment above).
390    if *outbound_only && request_subtype != "initialize" {
391        response = SDKControlResponse {
392            response_type: "control_response".to_string(),
393            response: SDKControlResponsePayload {
394                response_subtype: "error".to_string(),
395                request_id: request_id.clone(),
396                error: Some(OUTBOUND_ONLY_ERROR.to_string()),
397                response: None,
398            },
399        };
400        let event = serde_json::json!({
401            "type": "control_response",
402            "response": response.response,
403            "session_id": session_id
404        });
405        let _ = transport.write(event);
406        log_for_debugging(&format!(
407            "[bridge:repl] Rejected {} (outbound-only) request_id={}",
408            request_subtype, request_id
409        ));
410        return;
411    }
412
413    match request_subtype.as_str() {
414        "initialize" => {
415            // Respond with minimal capabilities — the REPL handles
416            // commands, models, and account info itself.
417            response = SDKControlResponse {
418                response_type: "control_response".to_string(),
419                response: SDKControlResponsePayload {
420                    response_subtype: "success".to_string(),
421                    request_id: request_id.clone(),
422                    error: None,
423                    response: Some(serde_json::json!({
424                        "commands": [],
425                        "output_style": "normal",
426                        "available_output_styles": ["normal"],
427                        "models": [],
428                        "account": {},
429                        "pid": std::process::id(),
430                    })),
431                },
432            };
433        }
434        "set_model" => {
435            on_set_model
436                .as_ref()
437                .map(|cb| cb(request_payload.model.clone()));
438            response = SDKControlResponse {
439                response_type: "control_response".to_string(),
440                response: SDKControlResponsePayload {
441                    response_subtype: "success".to_string(),
442                    request_id: request_id.clone(),
443                    error: None,
444                    response: None,
445                },
446            };
447        }
448        "set_max_thinking_tokens" => {
449            on_set_max_thinking_tokens
450                .as_ref()
451                .map(|cb| cb(request_payload.max_thinking_tokens));
452            response = SDKControlResponse {
453                response_type: "control_response".to_string(),
454                response: SDKControlResponsePayload {
455                    response_subtype: "success".to_string(),
456                    request_id: request_id.clone(),
457                    error: None,
458                    response: None,
459                },
460            };
461        }
462        "set_permission_mode" => {
463            // The callback returns a policy verdict so we can send an error
464            // control_response without importing isAutoModeGateEnabled /
465            // isBypassPermissionsModeDisabled here (bootstrap-isolation). If no
466            // callback is registered (daemon context, which doesn't wire this),
467            // return an error verdict rather than a silent false-success: the mode
468            // is never actually applied in that context, so success would lie to the client.
469            let mode = request_payload.mode.clone().unwrap_or_default();
470            let verdict = on_set_permission_mode
471                .as_ref()
472                .map(|cb| cb(mode.clone()))
473                .unwrap_or(Err(
474                    "set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)".to_string()
475                ));
476
477            if verdict.is_ok() {
478                response = SDKControlResponse {
479                    response_type: "control_response".to_string(),
480                    response: SDKControlResponsePayload {
481                        response_subtype: "success".to_string(),
482                        request_id: request_id.clone(),
483                        error: None,
484                        response: None,
485                    },
486                };
487            } else {
488                response = SDKControlResponse {
489                    response_type: "control_response".to_string(),
490                    response: SDKControlResponsePayload {
491                        response_subtype: "error".to_string(),
492                        request_id: request_id.clone(),
493                        error: Some(verdict.err().unwrap_or_default()),
494                        response: None,
495                    },
496                };
497            }
498        }
499        "interrupt" => {
500            on_interrupt.as_ref().map(|cb| cb());
501            response = SDKControlResponse {
502                response_type: "control_response".to_string(),
503                response: SDKControlResponsePayload {
504                    response_subtype: "success".to_string(),
505                    request_id: request_id.clone(),
506                    error: None,
507                    response: None,
508                },
509            };
510        }
511        _ => {
512            // Unknown subtype — respond with error so the server doesn't
513            // hang waiting for a reply that never comes.
514            response = SDKControlResponse {
515                response_type: "control_response".to_string(),
516                response: SDKControlResponsePayload {
517                    response_subtype: "error".to_string(),
518                    request_id: request_id.clone(),
519                    error: Some(format!(
520                        "REPL bridge does not handle control_request subtype: {}",
521                        request_subtype
522                    )),
523                    response: None,
524                },
525            };
526        }
527    }
528
529    let event = serde_json::json!({
530        "type": "control_response",
531        "response": response.response,
532        "session_id": session_id
533    });
534    let _ = transport.write(event);
535    log_for_debugging(&format!(
536        "[bridge:repl] Sent control_response for {} request_id={} result={}",
537        request_subtype, request_id, request_payload.request_subtype
538    ));
539}
540
541// =============================================================================
542// RESULT MESSAGE (for session archival on teardown)
543// =============================================================================
544
545/// Empty usage for result message
546#[derive(Debug, Clone, Serialize, Deserialize, Default)]
547pub struct EmptyUsage {
548    pub input_tokens: u32,
549    pub output_tokens: u32,
550    #[serde(rename = "cache_creation_input_tokens")]
551    pub cache_creation_input_tokens: u32,
552    #[serde(rename = "cache_hit_input_tokens")]
553    pub cache_hit_input_tokens: u32,
554}
555
556/// Build a minimal `SDKResultSuccess` message for session archival.
557/// The server needs this event before a WS close to trigger archival.
558#[derive(Debug, Clone, Serialize, Deserialize)]
559pub struct SDKResultSuccess {
560    #[serde(rename = "type")]
561    pub result_type: String,
562    pub subtype: String,
563    #[serde(rename = "duration_ms")]
564    pub duration_ms: u64,
565    #[serde(rename = "duration_api_ms")]
566    pub duration_api_ms: u64,
567    #[serde(rename = "is_error")]
568    pub is_error: bool,
569    #[serde(rename = "num_turns")]
570    pub num_turns: u32,
571    pub result: String,
572    #[serde(rename = "stop_reason")]
573    pub stop_reason: Option<String>,
574    #[serde(rename = "total_cost_usd")]
575    pub total_cost_usd: f64,
576    pub usage: EmptyUsage,
577    #[serde(rename = "model_usage")]
578    pub model_usage: serde_json::Value,
579    #[serde(rename = "permission_denials")]
580    pub permission_denials: Vec<String>,
581    #[serde(rename = "session_id")]
582    pub session_id: String,
583    pub uuid: String,
584}
585
586pub fn make_result_message(session_id: &str) -> SDKResultSuccess {
587    SDKResultSuccess {
588        result_type: "result".to_string(),
589        subtype: "success".to_string(),
590        duration_ms: 0,
591        duration_api_ms: 0,
592        is_error: false,
593        num_turns: 0,
594        result: String::new(),
595        stop_reason: None,
596        total_cost_usd: 0.0,
597        usage: EmptyUsage {
598            input_tokens: 0,
599            output_tokens: 0,
600            cache_creation_input_tokens: 0,
601            cache_hit_input_tokens: 0,
602        },
603        model_usage: serde_json::json!({}),
604        permission_denials: vec![],
605        session_id: session_id.to_string(),
606        uuid: uuid::Uuid::new_v4().to_string(),
607    }
608}
609
610// =============================================================================
611// BOUNDED UUID SET (echo-dedup ring buffer)
612// =============================================================================
613
614/// FIFO-bounded set backed by a circular buffer. Evicts the oldest entry
615/// when capacity is reached, keeping memory usage constant at O(capacity).
616///
617/// Messages are added in chronological order, so evicted entries are always
618/// the oldest. The caller relies on external ordering (the hook's
619/// lastWrittenIndexRef) as the primary dedup — this set is a secondary
620/// safety net for echo filtering and race-condition dedup.
621pub struct BoundedUuidSet {
622    capacity: usize,
623    ring: Vec<Option<String>>,
624    set: HashSet<String>,
625    write_idx: usize,
626}
627
628impl BoundedUuidSet {
629    pub fn new(capacity: usize) -> Self {
630        Self {
631            capacity,
632            ring: vec![None; capacity],
633            set: HashSet::new(),
634            write_idx: 0,
635        }
636    }
637
638    pub fn add(&mut self, uuid: String) {
639        if self.set.contains(&uuid) {
640            return;
641        }
642        // Evict the entry at the current write position (if occupied)
643        if let Some(evicted) = self.ring[self.write_idx].take() {
644            self.set.remove(&evicted);
645        }
646        self.ring[self.write_idx] = Some(uuid.clone());
647        self.set.insert(uuid);
648        self.write_idx = (self.write_idx + 1) % self.capacity;
649    }
650
651    pub fn contains(&self, uuid: &str) -> bool {
652        self.set.contains(uuid)
653    }
654
655    pub fn clear(&mut self) {
656        self.set.clear();
657        for item in &mut self.ring {
658            *item = None;
659        }
660        self.write_idx = 0;
661    }
662}
663
664impl Default for BoundedUuidSet {
665    fn default() -> Self {
666        Self::new(100)
667    }
668}