Skip to main content

ai_agent/bridge/
bridge_messaging.rs

1//! Shared transport-layer helpers for bridge message handling.
2//!
3//! Translated from openclaudecode/src/bridge/bridgeMain.ts
4//!
5//! Extracted from replBridge.ts so both the env-based core (initBridgeCore)
6//! and the env-less core (initEnvLessBridgeCore) can use the same ingress
7//! parsing, control-request handling, and echo-dedup machinery.
8//!
9//! Everything here is pure — no closure over bridge-specific state. All
10//! collaborators (transport, sessionId, UUID sets, callbacks) are passed
11//! as params.
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashSet;
15use std::sync::Arc;
16
17use crate::bridge::SDKMessage;
18
19// =============================================================================
20// TYPE GUARDS
21// =============================================================================
22
23/// Type predicate for parsed WebSocket messages. SDKMessage is a
24/// discriminated union on `type` — validating the discriminant is
25/// sufficient for the predicate; callers narrow further via the union.
26pub fn is_sdk_message(value: &serde_json::Value) -> bool {
27    value.get("type").and_then(|v| v.as_str()).is_some()
28}
29
30/// Type predicate for control_response messages from the server.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SDKControlResponse {
33    #[serde(rename = "type")]
34    pub response_type: String,
35    pub response: SDKControlResponsePayload,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct SDKControlResponsePayload {
40    #[serde(rename = "subtype")]
41    pub response_subtype: String,
42    #[serde(rename = "request_id")]
43    pub request_id: String,
44    pub error: Option<String>,
45    pub response: Option<serde_json::Value>,
46}
47
48impl SDKControlResponse {
49    pub fn new(subtype: &str, request_id: &str) -> Self {
50        Self {
51            response_type: "control_response".to_string(),
52            response: SDKControlResponsePayload {
53                response_subtype: subtype.to_string(),
54                request_id: request_id.to_string(),
55                error: None,
56                response: None,
57            },
58        }
59    }
60
61    pub fn success(request_id: &str) -> Self {
62        Self::new("success", request_id)
63    }
64
65    pub fn error(request_id: &str, error: &str) -> Self {
66        Self {
67            response_type: "control_response".to_string(),
68            response: SDKControlResponsePayload {
69                response_subtype: "error".to_string(),
70                request_id: request_id.to_string(),
71                error: Some(error.to_string()),
72                response: None,
73            },
74        }
75    }
76}
77
78pub fn is_sdk_control_response(value: &serde_json::Value) -> bool {
79    value
80        .get("type")
81        .and_then(|v| v.as_str())
82        .map(|s| s == "control_response")
83        .unwrap_or(false)
84        && value.get("response").is_some()
85}
86
87/// Type predicate for control_request messages from the server.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(tag = "type", rename_all = "snake_case")]
90pub enum SDKControlRequest {
91    ControlRequest {
92        request_id: String,
93        request: SDKControlRequestPayload,
94    },
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SDKControlRequestPayload {
99    #[serde(rename = "subtype")]
100    pub request_subtype: String,
101    pub model: Option<String>,
102    #[serde(rename = "max_thinking_tokens")]
103    pub max_thinking_tokens: Option<u32>,
104    pub mode: Option<String>,
105}
106
107pub fn is_sdk_control_request(value: &serde_json::Value) -> bool {
108    value
109        .get("type")
110        .and_then(|v| v.as_str())
111        .map(|s| s == "control_request")
112        .unwrap_or(false)
113        && value.get("request_id").is_some()
114        && value.get("request").is_some()
115}
116
117// =============================================================================
118// MESSAGE ELIGIBILITY
119// =============================================================================
120
121/// Message type for internal representation
122#[derive(Debug, Clone)]
123pub enum MessageType {
124    User,
125    Assistant,
126    System,
127    ToolUse,
128    ToolResult,
129}
130
131/// Check if a message type should be forwarded to the bridge transport.
132/// The server only wants user/assistant turns and slash-command system events;
133/// everything else (tool_result, progress, etc.) is internal REPL chatter.
134pub fn is_eligible_bridge_message(
135    msg_type: &MessageType,
136    is_virtual: bool,
137    system_subtype: Option<&str>,
138) -> bool {
139    // Virtual messages (REPL inner calls) are display-only — bridge/SDK
140    // consumers see the REPL tool_use/result which summarizes the work.
141    if matches!(msg_type, MessageType::User | MessageType::Assistant) && is_virtual {
142        return false;
143    }
144    matches!(msg_type, MessageType::User | MessageType::Assistant)
145        || (matches!(msg_type, MessageType::System) && system_subtype == Some("local_command"))
146}
147
148// =============================================================================
149// TITLE TEXT EXTRACTION
150// =============================================================================
151
152/// Extract title-worthy text from a Message for onUserMessage. Returns
153/// None for messages that shouldn't title the session: non-user, meta
154/// (nudges), tool results, compact summaries, non-human origins (task
155/// notifications, channel messages), or pure display-tag content
156/// (<ide_opened_file>, <session-start-hook>, etc.).
157pub fn extract_title_text(
158    msg_type: &MessageType,
159    is_meta: bool,
160    tool_use_result: bool,
161    is_compact_summary: bool,
162    origin_kind: Option<&str>,
163    content: &str,
164) -> Option<String> {
165    // Filter out non-user, meta, tool results, compact summaries
166    if !matches!(msg_type, MessageType::User) || is_meta || tool_use_result || is_compact_summary {
167        return None;
168    }
169
170    // Filter out non-human origins
171    if let Some(kind) = origin_kind {
172        if kind != "human" {
173            return None;
174        }
175    }
176
177    // Extract text content
178    if content.is_empty() {
179        return None;
180    }
181
182    // Strip display tags (simplified - would need full implementation)
183    let clean = strip_display_tags_allow_empty(content);
184    if clean.is_empty() {
185        None
186    } else {
187        Some(clean)
188    }
189}
190
191/// Strip display tags from text (simplified implementation).
192fn strip_display_tags_allow_empty(s: &str) -> String {
193    // Simplified: just return the input for now
194    // Full implementation would strip <ide_opened_file>, <session-start-hook>, etc.
195    s.to_string()
196}
197
198// =============================================================================
199// INGRESS ROUTING
200// =============================================================================
201
202/// Ingress message handler callback types
203pub type OnInboundMessage = Arc<dyn Fn(SDKMessage) + Send + Sync>;
204pub type OnPermissionResponse = Arc<dyn Fn(SDKControlResponse) + Send + Sync>;
205pub type OnControlRequest = Arc<dyn Fn(SDKControlRequest) + Send + Sync>;
206
207/// Parse an ingress WebSocket message and route it to the appropriate handler.
208/// Ignores messages whose UUID is in recentPostedUUIDs (echoes of what we sent)
209/// or in recentInboundUUIDs (re-deliveries we've already forwarded — e.g.
210/// server replayed history after a transport swap lost the seq-num cursor).
211pub fn handle_ingress_message(
212    data: &str,
213    recent_posted_uuids: &mut BoundedUuidSet,
214    recent_inbound_uuids: &mut BoundedUuidSet,
215    on_inbound_message: Option<&OnInboundMessage>,
216    on_permission_response: Option<&OnPermissionResponse>,
217    on_control_request: Option<&OnControlRequest>,
218    log_for_debugging: &dyn Fn(&str),
219) {
220    // Parse the JSON data
221    let parsed: serde_json::Value = match serde_json::from_str(data) {
222        Ok(v) => v,
223        Err(err) => {
224            log_for_debugging(&format!(
225                "[bridge:repl] Failed to parse ingress message: {}",
226                err
227            ));
228            return;
229        }
230    };
231
232    // control_response is not an SDKMessage — check before the type guard
233    if is_sdk_control_response(&parsed) {
234        log_for_debugging("[bridge:repl] Ingress message type=control_response");
235        if let Some(callback) = on_permission_response {
236            if let Ok(response) = serde_json::from_value::<SDKControlResponse>(parsed.clone()) {
237                callback(response);
238            }
239        }
240        return;
241    }
242
243    // control_request from the server (initialize, set_model, can_use_tool).
244    // Must respond promptly or the server kills the WS (~10-14s timeout).
245    if is_sdk_control_request(&parsed) {
246        let subtype = parsed
247            .get("request")
248            .and_then(|r| r.get("subtype"))
249            .and_then(|v| v.as_str())
250            .unwrap_or("unknown");
251        log_for_debugging(&format!(
252            "[bridge:repl] Inbound control_request subtype={}",
253            subtype
254        ));
255        if let Some(callback) = on_control_request {
256            if let Ok(request) = serde_json::from_value::<SDKControlRequest>(parsed.clone()) {
257                callback(request);
258            }
259        }
260        return;
261    }
262
263    if !is_sdk_message(&parsed) {
264        return;
265    }
266
267    // Check for UUID to detect echoes of our own messages
268    let uuid = parsed.get("uuid").and_then(|v| v.as_str());
269
270    if let Some(uuid_str) = uuid {
271        if recent_posted_uuids.contains(uuid_str) {
272            let msg_type = parsed
273                .get("type")
274                .and_then(|v| v.as_str())
275                .unwrap_or("unknown");
276            log_for_debugging(&format!(
277                "[bridge:repl] Ignoring echo: type={} uuid={}",
278                msg_type, uuid_str
279            ));
280            return;
281        }
282
283        // Defensive dedup: drop inbound prompts we've already forwarded.
284        if recent_inbound_uuids.contains(uuid_str) {
285            let msg_type = parsed
286                .get("type")
287                .and_then(|v| v.as_str())
288                .unwrap_or("unknown");
289            log_for_debugging(&format!(
290                "[bridge:repl] Ignoring re-delivered inbound: type={} uuid={}",
291                msg_type, uuid_str
292            ));
293            return;
294        }
295    }
296
297    let msg_type = parsed
298        .get("type")
299        .and_then(|v| v.as_str())
300        .unwrap_or("unknown");
301    let uuid_suffix = uuid.map(|u| format!(" uuid={}", u)).unwrap_or_default();
302    log_for_debugging(&format!(
303        "[bridge:repl] Ingress message type={}{}",
304        msg_type, uuid_suffix
305    ));
306
307    if msg_type == "user" {
308        if let Some(uuid_str) = uuid {
309            recent_inbound_uuids.add(uuid_str.to_string());
310        }
311        // Fire-and-forget — handler may be async (attachment resolution).
312        if let Some(callback) = on_inbound_message {
313            if let Ok(msg) = serde_json::from_value::<SDKMessage>(parsed.clone()) {
314                callback(msg);
315            }
316        }
317    } else {
318        log_for_debugging(&format!(
319            "[bridge:repl] Ignoring non-user inbound message: type={}",
320            msg_type
321        ));
322    }
323}
324
325// =============================================================================
326// SERVER-INITIATED CONTROL REQUESTS
327// =============================================================================
328
329/// Server control request handlers
330pub struct ServerControlRequestHandlers {
331    pub transport: Option<Box<dyn ReplBridgeTransport + Send>>,
332    pub session_id: String,
333    /// When true, all mutable requests (interrupt, set_model, set_permission_mode,
334    /// set_max_thinking_tokens) reply with an error instead of false-success.
335    /// initialize still replies success — the server kills the connection otherwise.
336    /// Used by the outbound-only bridge mode and the SDK's /bridge subpath so claude.ai sees a
337    /// proper error instead of "action succeeded but nothing happened locally".
338    pub outbound_only: bool,
339    pub on_interrupt: Option<Arc<dyn Fn() + Send + Sync>>,
340    pub on_set_model: Option<Arc<dyn Fn(Option<String>) + Send + Sync>>,
341    pub on_set_max_thinking_tokens: Option<Arc<dyn Fn(Option<u32>) + Send + Sync>>,
342    pub on_set_permission_mode: Option<Arc<dyn Fn(String) -> Result<(), String> + Send + Sync>>,
343}
344
345/// Trait for bridge transport
346pub trait ReplBridgeTransport {
347    fn write(&self, event: serde_json::Value) -> Result<(), String>;
348}
349
350const OUTBOUND_ONLY_ERROR: &str =
351    "This session is outbound-only. Enable Remote Control locally to allow inbound control.";
352
353/// Respond to inbound control_request messages from the server. The server
354/// sends these for session lifecycle events (initialize, set_model) and
355/// for turn-level coordination (interrupt, set_max_thinking_tokens). If we
356/// don't respond, the server hangs and kills the WS after ~10-14s.
357pub fn handle_server_control_request(
358    request: &SDKControlRequest,
359    handlers: &ServerControlRequestHandlers,
360    log_for_debugging: &dyn Fn(&str),
361) {
362    let ServerControlRequestHandlers {
363        transport,
364        session_id,
365        outbound_only,
366        on_interrupt,
367        on_set_model,
368        on_set_max_thinking_tokens,
369        on_set_permission_mode,
370    } = handlers;
371
372    let Some(transport) = transport else {
373        log_for_debugging(
374            "[bridge:repl] Cannot respond to control_request: transport not configured",
375        );
376        return;
377    };
378
379    let SDKControlRequest::ControlRequest {
380        request_id,
381        request: request_payload,
382    } = request
383    else {
384        return;
385    };
386
387    let request_subtype = &request_payload.request_subtype;
388
389    let response: SDKControlResponse;
390
391    // Outbound-only: reply error for mutable requests so claude.ai doesn't show
392    // false success. initialize must still succeed (server kills the connection
393    // if it doesn't — see comment above).
394    if *outbound_only && request_subtype != "initialize" {
395        response = SDKControlResponse {
396            response_type: "control_response".to_string(),
397            response: SDKControlResponsePayload {
398                response_subtype: "error".to_string(),
399                request_id: request_id.clone(),
400                error: Some(OUTBOUND_ONLY_ERROR.to_string()),
401                response: None,
402            },
403        };
404        let event = serde_json::json!({
405            "type": "control_response",
406            "response": response.response,
407            "session_id": session_id
408        });
409        let _ = transport.write(event);
410        log_for_debugging(&format!(
411            "[bridge:repl] Rejected {} (outbound-only) request_id={}",
412            request_subtype, request_id
413        ));
414        return;
415    }
416
417    match request_subtype.as_str() {
418        "initialize" => {
419            // Respond with minimal capabilities — the REPL handles
420            // commands, models, and account info itself.
421            response = SDKControlResponse {
422                response_type: "control_response".to_string(),
423                response: SDKControlResponsePayload {
424                    response_subtype: "success".to_string(),
425                    request_id: request_id.clone(),
426                    error: None,
427                    response: Some(serde_json::json!({
428                        "commands": [],
429                        "output_style": "normal",
430                        "available_output_styles": ["normal"],
431                        "models": [],
432                        "account": {},
433                        "pid": std::process::id(),
434                    })),
435                },
436            };
437        }
438        "set_model" => {
439            on_set_model
440                .as_ref()
441                .map(|cb| cb(request_payload.model.clone()));
442            response = SDKControlResponse {
443                response_type: "control_response".to_string(),
444                response: SDKControlResponsePayload {
445                    response_subtype: "success".to_string(),
446                    request_id: request_id.clone(),
447                    error: None,
448                    response: None,
449                },
450            };
451        }
452        "set_max_thinking_tokens" => {
453            on_set_max_thinking_tokens
454                .as_ref()
455                .map(|cb| cb(request_payload.max_thinking_tokens));
456            response = SDKControlResponse {
457                response_type: "control_response".to_string(),
458                response: SDKControlResponsePayload {
459                    response_subtype: "success".to_string(),
460                    request_id: request_id.clone(),
461                    error: None,
462                    response: None,
463                },
464            };
465        }
466        "set_permission_mode" => {
467            // The callback returns a policy verdict so we can send an error
468            // control_response without importing isAutoModeGateEnabled /
469            // isBypassPermissionsModeDisabled here (bootstrap-isolation). If no
470            // callback is registered (daemon context, which doesn't wire this),
471            // return an error verdict rather than a silent false-success: the mode
472            // is never actually applied in that context, so success would lie to the client.
473            let mode = request_payload.mode.clone().unwrap_or_default();
474            let verdict = on_set_permission_mode
475                .as_ref()
476                .map(|cb| cb(mode.clone()))
477                .unwrap_or(Err(
478                    "set_permission_mode is not supported in this context (onSetPermissionMode callback not registered)".to_string()
479                ));
480
481            if verdict.is_ok() {
482                response = SDKControlResponse {
483                    response_type: "control_response".to_string(),
484                    response: SDKControlResponsePayload {
485                        response_subtype: "success".to_string(),
486                        request_id: request_id.clone(),
487                        error: None,
488                        response: None,
489                    },
490                };
491            } else {
492                response = SDKControlResponse {
493                    response_type: "control_response".to_string(),
494                    response: SDKControlResponsePayload {
495                        response_subtype: "error".to_string(),
496                        request_id: request_id.clone(),
497                        error: Some(verdict.err().unwrap_or_default()),
498                        response: None,
499                    },
500                };
501            }
502        }
503        "interrupt" => {
504            on_interrupt.as_ref().map(|cb| cb());
505            response = SDKControlResponse {
506                response_type: "control_response".to_string(),
507                response: SDKControlResponsePayload {
508                    response_subtype: "success".to_string(),
509                    request_id: request_id.clone(),
510                    error: None,
511                    response: None,
512                },
513            };
514        }
515        _ => {
516            // Unknown subtype — respond with error so the server doesn't
517            // hang waiting for a reply that never comes.
518            response = SDKControlResponse {
519                response_type: "control_response".to_string(),
520                response: SDKControlResponsePayload {
521                    response_subtype: "error".to_string(),
522                    request_id: request_id.clone(),
523                    error: Some(format!(
524                        "REPL bridge does not handle control_request subtype: {}",
525                        request_subtype
526                    )),
527                    response: None,
528                },
529            };
530        }
531    }
532
533    let event = serde_json::json!({
534        "type": "control_response",
535        "response": response.response,
536        "session_id": session_id
537    });
538    let _ = transport.write(event);
539    log_for_debugging(&format!(
540        "[bridge:repl] Sent control_response for {} request_id={} result={}",
541        request_subtype, request_id, request_payload.request_subtype
542    ));
543}
544
545// =============================================================================
546// RESULT MESSAGE (for session archival on teardown)
547// =============================================================================
548
549/// Empty usage for result message
550#[derive(Debug, Clone, Serialize, Deserialize, Default)]
551pub struct EmptyUsage {
552    pub input_tokens: u32,
553    pub output_tokens: u32,
554    #[serde(rename = "cache_creation_input_tokens")]
555    pub cache_creation_input_tokens: u32,
556    #[serde(rename = "cache_hit_input_tokens")]
557    pub cache_hit_input_tokens: u32,
558}
559
560/// Build a minimal `SDKResultSuccess` message for session archival.
561/// The server needs this event before a WS close to trigger archival.
562#[derive(Debug, Clone, Serialize, Deserialize)]
563pub struct SDKResultSuccess {
564    #[serde(rename = "type")]
565    pub result_type: String,
566    pub subtype: String,
567    #[serde(rename = "duration_ms")]
568    pub duration_ms: u64,
569    #[serde(rename = "duration_api_ms")]
570    pub duration_api_ms: u64,
571    #[serde(rename = "is_error")]
572    pub is_error: bool,
573    #[serde(rename = "num_turns")]
574    pub num_turns: u32,
575    pub result: String,
576    #[serde(rename = "stop_reason")]
577    pub stop_reason: Option<String>,
578    #[serde(rename = "total_cost_usd")]
579    pub total_cost_usd: f64,
580    pub usage: EmptyUsage,
581    #[serde(rename = "model_usage")]
582    pub model_usage: serde_json::Value,
583    #[serde(rename = "permission_denials")]
584    pub permission_denials: Vec<String>,
585    #[serde(rename = "session_id")]
586    pub session_id: String,
587    pub uuid: String,
588}
589
590pub fn make_result_message(session_id: &str) -> SDKResultSuccess {
591    SDKResultSuccess {
592        result_type: "result".to_string(),
593        subtype: "success".to_string(),
594        duration_ms: 0,
595        duration_api_ms: 0,
596        is_error: false,
597        num_turns: 0,
598        result: String::new(),
599        stop_reason: None,
600        total_cost_usd: 0.0,
601        usage: EmptyUsage {
602            input_tokens: 0,
603            output_tokens: 0,
604            cache_creation_input_tokens: 0,
605            cache_hit_input_tokens: 0,
606        },
607        model_usage: serde_json::json!({}),
608        permission_denials: vec![],
609        session_id: session_id.to_string(),
610        uuid: uuid::Uuid::new_v4().to_string(),
611    }
612}
613
614// =============================================================================
615// BOUNDED UUID SET (echo-dedup ring buffer)
616// =============================================================================
617
618/// FIFO-bounded set backed by a circular buffer. Evicts the oldest entry
619/// when capacity is reached, keeping memory usage constant at O(capacity).
620///
621/// Messages are added in chronological order, so evicted entries are always
622/// the oldest. The caller relies on external ordering (the hook's
623/// lastWrittenIndexRef) as the primary dedup — this set is a secondary
624/// safety net for echo filtering and race-condition dedup.
625pub struct BoundedUuidSet {
626    capacity: usize,
627    ring: Vec<Option<String>>,
628    set: HashSet<String>,
629    write_idx: usize,
630}
631
632impl BoundedUuidSet {
633    pub fn new(capacity: usize) -> Self {
634        Self {
635            capacity,
636            ring: vec![None; capacity],
637            set: HashSet::new(),
638            write_idx: 0,
639        }
640    }
641
642    pub fn add(&mut self, uuid: String) {
643        if self.set.contains(&uuid) {
644            return;
645        }
646        // Evict the entry at the current write position (if occupied)
647        if let Some(evicted) = self.ring[self.write_idx].take() {
648            self.set.remove(&evicted);
649        }
650        self.ring[self.write_idx] = Some(uuid.clone());
651        self.set.insert(uuid);
652        self.write_idx = (self.write_idx + 1) % self.capacity;
653    }
654
655    pub fn contains(&self, uuid: &str) -> bool {
656        self.set.contains(uuid)
657    }
658
659    pub fn clear(&mut self) {
660        self.set.clear();
661        for item in &mut self.ring {
662            *item = None;
663        }
664        self.write_idx = 0;
665    }
666}
667
668impl Default for BoundedUuidSet {
669    fn default() -> Self {
670        Self::new(100)
671    }
672}