use std::path::Path;
use serde_json::json;
use tokio::time::{Duration, timeout};
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{ThreadStatusInfo, ThreadSummary};
use crate::storage::PRIMARY_RUNTIME_ID;
use super::super::events::handle_app_server_message;
use super::support::{bootstrap_test_state, primary_runtime};
#[tokio::test]
async fn app_server_starting_message_updates_status_without_deadlock() {
let state = bootstrap_test_state().await;
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Starting {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
},
),
)
.await
.expect("处理 Starting 消息超时")
.expect("处理 Starting 消息失败");
let runtime = primary_runtime(&state).await;
let status = runtime.status.read().await.clone();
assert_eq!(status.status, "starting");
assert_eq!(status.app_server_handshake.state, "starting");
}
#[tokio::test]
async fn permissions_server_request_is_persisted_for_clients() {
let state = bootstrap_test_state().await;
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::ServerRequest {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
id: json!("req-permissions-1"),
method: "item/permissions/requestApproval".to_string(),
params: json!({
"threadId": "thread-1",
"turnId": "turn-1",
"itemId": "item-1",
"reason": "需要额外权限",
"permissions": {
"network": { "enabled": true },
"fileSystem": {
"read": ["/srv/workspace/docs"],
"write": ["/srv/workspace/tmp"]
}
}
}),
},
),
)
.await
.expect("处理 permissions request 超时")
.expect("处理 permissions request 失败");
let pending = state
.storage
.list_pending_requests()
.expect("读取 pending requests 失败");
assert_eq!(1, pending.len());
assert_eq!("item/permissions/requestApproval", pending[0].request_type);
assert_eq!(Some("thread-1"), pending[0].thread_id.as_deref());
assert!(pending[0].permissions.is_some());
}
#[tokio::test]
async fn legacy_server_request_is_normalized_into_pending_request() {
let state = bootstrap_test_state().await;
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::ServerRequest {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
id: json!("req-exec-1"),
method: "execCommandApproval".to_string(),
params: json!({
"conversationId": "thread-legacy",
"callId": "call-exec-1",
"approvalId": "approval-1",
"command": ["git", "status"],
"cwd": "/srv/workspace",
"reason": "需要执行命令",
"parsedCmd": []
}),
},
),
)
.await
.expect("处理 legacy request 超时")
.expect("处理 legacy request 失败");
let pending = state
.storage
.list_pending_requests()
.expect("读取 pending requests 失败");
assert_eq!(1, pending.len());
assert_eq!("execCommandApproval", pending[0].request_type);
assert_eq!(Some("thread-legacy"), pending[0].thread_id.as_deref());
assert_eq!(Some("git status"), pending[0].command.as_deref());
assert_eq!(
vec!["approved", "approved_for_session", "denied", "abort"],
pending[0].available_decisions,
);
}
#[tokio::test]
async fn turn_completed_notification_cleans_up_staged_turn_inputs() {
let state = bootstrap_test_state().await;
let stage_response = state
.handle_request(
"stage_input_image",
json!({
"fileName": "cleanup.png",
"mimeType": "image/png",
"base64Data": "aGVsbG8="
}),
)
.await
.expect("stage_input_image 请求失败");
let local_path = stage_response["image"]["localPath"]
.as_str()
.expect("应返回 localPath")
.to_string();
assert!(Path::new(&local_path).exists());
state.register_staged_turn_inputs("turn-cleanup", vec![local_path.clone().into()]);
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Notification {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
method: "turn/completed".to_string(),
params: json!({
"threadId": "thread-1",
"turnId": "turn-cleanup",
"status": { "type": "completed" }
}),
},
),
)
.await
.expect("处理 turn/completed 超时")
.expect("处理 turn/completed 失败");
assert!(!Path::new(&local_path).exists());
}
#[tokio::test]
async fn runtime_exit_downgrades_cached_threads_and_emits_status_change() {
let state = bootstrap_test_state().await;
state
.storage
.upsert_thread_index(&ThreadSummary {
id: "thread-stale".to_string(),
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
name: Some("待恢复线程".to_string()),
preview: "preview".to_string(),
cwd: "/srv/workspace".to_string(),
status: "active".to_string(),
status_info: ThreadStatusInfo {
kind: "active".to_string(),
reason: None,
raw: json!({ "type": "active" }),
},
token_usage: None,
model_provider: "openai".to_string(),
source: "mobile".to_string(),
created_at: 1,
updated_at: 2,
is_loaded: true,
is_active: true,
archived: false,
})
.expect("写入缓存线程失败");
let mut events = state.subscribe_events();
timeout(
Duration::from_secs(2),
handle_app_server_message(
&state,
AppServerInbound::Exited {
runtime_id: PRIMARY_RUNTIME_ID.to_string(),
message: "runtime stopped".to_string(),
expected: true,
},
),
)
.await
.expect("处理 Exited 消息超时")
.expect("处理 Exited 消息失败");
let thread_event = timeout(Duration::from_secs(2), async {
loop {
let event = events.recv().await.expect("应收到线程状态事件");
if event.event_type == "thread/status/changed" {
return event;
}
}
})
.await
.expect("等待 thread/status/changed 超时");
let downgraded = state
.storage
.get_thread_index("thread-stale")
.expect("读取降级后的线程失败")
.expect("降级后的线程不存在");
assert_eq!(downgraded.status, "notLoaded");
assert!(!downgraded.is_loaded);
assert!(!downgraded.is_active);
assert_eq!(thread_event.thread_id.as_deref(), Some("thread-stale"));
assert_eq!(
thread_event.payload["statusInfo"]["kind"],
json!("notLoaded")
);
}