codex-mobile-bridge 0.3.10

Remote bridge and service manager for codex-mobile.
Documentation
use serde_json::json;
use tokio::time::{Duration, timeout};

use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{ThreadRenderNode, ThreadRenderSnapshot};
use crate::storage::PRIMARY_RUNTIME_ID;

use super::super::events::handle_app_server_message;
use super::support::bootstrap_test_state;

#[tokio::test]
async fn command_execution_completion_keeps_streamed_output_when_turn_id_changes() {
    let state = bootstrap_test_state().await;

    timeout(
        Duration::from_secs(2),
        handle_app_server_message(
            &state,
            AppServerInbound::Notification {
                runtime_id: PRIMARY_RUNTIME_ID.to_string(),
                method: "item/commandExecution/outputDelta".to_string(),
                params: json!({
                    "threadId": "thread-render-exec-delta",
                    "turnId": "turn-delta",
                    "itemId": "item-ran",
                    "delta": "partial output\n"
                }),
            },
        ),
    )
    .await
    .expect("处理 commandExecution outputDelta 超时")
    .expect("处理 commandExecution outputDelta 失败");

    timeout(
        Duration::from_secs(2),
        handle_app_server_message(
            &state,
            AppServerInbound::Notification {
                runtime_id: PRIMARY_RUNTIME_ID.to_string(),
                method: "item/completed".to_string(),
                params: json!({
                    "threadId": "thread-render-exec-delta",
                    "turnId": "turn-completed",
                    "item": {
                        "id": "item-ran",
                        "type": "commandExecution",
                        "status": "completed",
                        "command": "echo partial",
                        "exitCode": 0,
                        "commandActions": [
                            {
                                "type": "run",
                                "command": "echo partial"
                            }
                        ]
                    }
                }),
            },
        ),
    )
    .await
    .expect("处理 commandExecution item/completed 超时")
    .expect("处理 commandExecution item/completed 失败");

    let snapshots = state
        .thread_render_snapshots
        .lock()
        .expect("thread render snapshots poisoned");
    let snapshot = snapshots
        .get("thread-render-exec-delta")
        .expect("应缓存 thread render snapshot");
    let exec_node = snapshot
        .nodes
        .iter()
        .find_map(|node| match node {
            ThreadRenderNode::ExecGroup {
                kind,
                output_text,
                commands,
                ..
            } => Some((kind.clone(), output_text.clone(), commands.clone())),
            _ => None,
        })
        .expect("应存在 exec group 节点");
    assert_eq!("ran", exec_node.0);
    assert_eq!(Some("partial output\n".to_string()), exec_node.1);
    assert_eq!(1, exec_node.2.len());
    assert_eq!("echo partial", exec_node.2[0].text);
}

#[tokio::test]
async fn command_execution_completion_prefers_final_aggregated_output() {
    let state = bootstrap_test_state().await;

    timeout(
        Duration::from_secs(2),
        handle_app_server_message(
            &state,
            AppServerInbound::Notification {
                runtime_id: PRIMARY_RUNTIME_ID.to_string(),
                method: "item/commandExecution/outputDelta".to_string(),
                params: json!({
                    "threadId": "thread-render-exec-final",
                    "turnId": "turn-final",
                    "itemId": "item-ran-final",
                    "delta": "stale output\n"
                }),
            },
        ),
    )
    .await
    .expect("处理 commandExecution outputDelta 超时")
    .expect("处理 commandExecution outputDelta 失败");

    timeout(
        Duration::from_secs(2),
        handle_app_server_message(
            &state,
            AppServerInbound::Notification {
                runtime_id: PRIMARY_RUNTIME_ID.to_string(),
                method: "item/completed".to_string(),
                params: json!({
                    "threadId": "thread-render-exec-final",
                    "turnId": "turn-final",
                    "item": {
                        "id": "item-ran-final",
                        "type": "commandExecution",
                        "status": "completed",
                        "command": "printf final",
                        "aggregatedOutput": "final output\n",
                        "exitCode": 0,
                        "commandActions": [
                            {
                                "type": "run",
                                "command": "printf final"
                            }
                        ]
                    }
                }),
            },
        ),
    )
    .await
    .expect("处理 commandExecution item/completed 超时")
    .expect("处理 commandExecution item/completed 失败");

    let snapshots = state
        .thread_render_snapshots
        .lock()
        .expect("thread render snapshots poisoned");
    let snapshot = snapshots
        .get("thread-render-exec-final")
        .expect("应缓存 thread render snapshot");
    let output_text = snapshot
        .nodes
        .iter()
        .find_map(|node| match node {
            ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
            _ => None,
        })
        .expect("应存在 exec group 输出");
    assert_eq!("final output\n", output_text);
}

#[tokio::test]
async fn cache_thread_render_snapshot_backfills_missing_exec_output() {
    let state = bootstrap_test_state().await;
    let thread_id = "thread-render-exec-cache";
    let existing = state.cache_thread_render_snapshot(ThreadRenderSnapshot {
        runtime_id: PRIMARY_RUNTIME_ID.to_string(),
        thread_id: thread_id.to_string(),
        revision: 1,
        status_surface: None,
        nodes: vec![ThreadRenderNode::ExecGroup {
            id: "turn-live:item-cache:exec".to_string(),
            turn_id: Some("turn-live".to_string()),
            item_id: Some("item-cache".to_string()),
            title: "Ran".to_string(),
            kind: "ran".to_string(),
            state: "completed".to_string(),
            commands: Vec::new(),
            output_text: Some("streamed output\n".to_string()),
            exit_code: Some(0),
        }],
    });
    assert_eq!(
        Some("streamed output\n".to_string()),
        existing.nodes.iter().find_map(|node| match node {
            ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
            _ => None,
        })
    );

    let merged = state.cache_thread_render_snapshot(ThreadRenderSnapshot {
        runtime_id: PRIMARY_RUNTIME_ID.to_string(),
        thread_id: thread_id.to_string(),
        revision: 1,
        status_surface: None,
        nodes: vec![ThreadRenderNode::ExecGroup {
            id: "turn-read:item-cache:exec".to_string(),
            turn_id: Some("turn-read".to_string()),
            item_id: Some("item-cache".to_string()),
            title: "Ran".to_string(),
            kind: "ran".to_string(),
            state: "completed".to_string(),
            commands: Vec::new(),
            output_text: None,
            exit_code: Some(0),
        }],
    });

    assert_eq!(
        Some("streamed output\n".to_string()),
        merged.nodes.iter().find_map(|node| match node {
            ThreadRenderNode::ExecGroup { output_text, .. } => output_text.clone(),
            _ => None,
        })
    );
}