use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::{debug, error, info, warn};
struct ResolvedCommand {
command: String,
login_shell_path: Option<String>,
}
async fn resolve_command_path(command: &str) -> ResolvedCommand {
if command.starts_with('/') {
debug!(command = %command, "ACP command is absolute, skipping resolution");
return ResolvedCommand {
command: command.to_string(),
login_shell_path: resolve_login_shell_path().await,
};
}
let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
let which_arg = format!("which {}", command);
debug!(
shell = %shell,
command = %command,
"Resolving ACP command via login shell"
);
let result = tokio::process::Command::new(&shell)
.arg("-l")
.arg("-c")
.arg(&which_arg)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await;
let resolved_cmd = match result {
Ok(output) if output.status.success() => {
let resolved = String::from_utf8_lossy(&output.stdout).trim().to_string();
if resolved.is_empty() {
warn!(
command = %command,
"Login shell 'which' returned empty output, falling back to original"
);
command.to_string()
} else {
info!(
command = %command,
resolved = %resolved,
"Resolved ACP command via login shell"
);
resolved
}
}
Ok(output) => {
warn!(
command = %command,
exit_code = ?output.status.code(),
stderr = %String::from_utf8_lossy(&output.stderr).trim(),
"Failed to resolve ACP command via login shell, using original"
);
command.to_string()
}
Err(e) => {
warn!(
command = %command,
error = %e,
"Failed to run login shell for command resolution, using original"
);
command.to_string()
}
};
ResolvedCommand {
command: resolved_cmd,
login_shell_path: resolve_login_shell_path().await,
}
}
async fn resolve_login_shell_path() -> Option<String> {
let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
let result = tokio::process::Command::new(&shell)
.arg("-l")
.arg("-c")
.arg("echo $PATH")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.output()
.await;
match result {
Ok(output) if output.status.success() => {
let path = String::from_utf8_lossy(&output.stdout).trim().to_string();
if path.is_empty() {
None
} else {
Some(path)
}
}
_ => None,
}
}
use crate::config::ProposalSessionConfig;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<Value>,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i64,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcNotification {
pub jsonrpc: String,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<Value>,
}
#[derive(Debug, Clone)]
pub enum AcpMessage {
Notification(JsonRpcNotification),
}
impl AcpMessage {
pub fn as_update(&self) -> Option<AcpUpdateParams> {
match self {
AcpMessage::Notification(notif) if notif.method == "session/update" => notif
.params
.clone()
.and_then(|params| serde_json::from_value::<AcpUpdateParams>(params).ok()),
_ => None,
}
}
pub fn as_elicitation(&self) -> Option<AcpElicitationParams> {
match self {
AcpMessage::Notification(notif) if notif.method == "session/elicitation" => notif
.params
.clone()
.and_then(|params| serde_json::from_value::<AcpElicitationParams>(params).ok()),
_ => None,
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct AcpUpdateParams {
#[serde(default, rename = "sessionId")]
#[allow(dead_code)]
pub session_id: Option<String>,
pub update: AcpEvent,
}
#[derive(Debug, Clone, Deserialize)]
pub struct AcpElicitationParams {
#[serde(default, rename = "sessionId")]
pub session_id: Option<String>,
#[serde(default, rename = "requestId")]
pub request_id: String,
#[serde(default)]
pub mode: String,
#[serde(default)]
pub message: String,
#[serde(default)]
pub schema: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "sessionUpdate", rename_all = "snake_case")]
pub enum AcpEvent {
AgentMessageChunk {
#[serde(default)]
content: Option<AcpContent>,
},
AgentThoughtChunk {
#[serde(default)]
content: Option<AcpContent>,
},
ToolCall {
#[serde(default, rename = "toolCallId")]
tool_call_id: String,
#[serde(default)]
title: String,
#[serde(default)]
kind: String,
#[serde(default)]
status: String,
},
ToolCallUpdate {
#[serde(default, rename = "toolCallId")]
tool_call_id: String,
#[serde(default)]
status: String,
#[serde(default)]
content: Vec<Value>,
},
Elicitation {
#[serde(default, rename = "requestId")]
request_id: String,
#[serde(default)]
mode: String,
#[serde(default)]
message: String,
#[serde(default)]
schema: Option<Value>,
},
TurnComplete {
#[serde(default, rename = "stopReason")]
stop_reason: String,
},
#[serde(other)]
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcpContent {
#[serde(default, rename = "type")]
pub content_type: String,
#[serde(default)]
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AcpPromptBlock {
#[serde(rename = "type")]
pub content_type: String,
pub text: String,
}
impl AcpPromptBlock {
pub fn text(text: impl Into<String>) -> Self {
Self {
content_type: "text".to_string(),
text: text.into(),
}
}
}
async fn dispatch_jsonrpc_response(
pending_requests: &Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>>,
value: Value,
) -> bool {
let response = match serde_json::from_value::<JsonRpcResponse>(value.clone()) {
Ok(resp) => resp,
Err(e) => {
warn!(error = %e, payload = %value, "Failed to parse JSON-RPC response");
return true;
}
};
let Some(raw_id) = response.id.clone() else {
warn!(payload = %value, "JSON-RPC response missing id field");
return true;
};
let Some(request_id) = raw_id.as_u64() else {
warn!(response_id = %raw_id, "JSON-RPC response id is not u64");
return true;
};
let waiter = {
let mut pending = pending_requests.lock().await;
pending.remove(&request_id)
};
match waiter {
Some(tx) => {
if tx.send(response).is_err() {
debug!(request_id, "Response waiter dropped before delivery");
}
}
None => {
warn!(request_id, "Received response for unknown request id");
}
}
true
}
pub struct AcpClient {
stdin_tx: mpsc::Sender<String>,
notification_tx: mpsc::Sender<AcpMessage>,
notification_rx: Mutex<mpsc::Receiver<AcpMessage>>,
next_id: AtomicU64,
pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>>,
child: Mutex<Option<Child>>,
initialized: Mutex<bool>,
working_dir: PathBuf,
}
impl AcpClient {
#[cfg(test)]
pub fn new_for_test() -> Self {
let (stdin_tx, _stdin_rx) = mpsc::channel::<String>(1);
let (notification_tx, notification_rx) = mpsc::channel::<AcpMessage>(1);
Self {
stdin_tx,
notification_tx,
notification_rx: Mutex::new(notification_rx),
next_id: AtomicU64::new(1),
pending_requests: Arc::new(Mutex::new(HashMap::new())),
child: Mutex::new(None),
initialized: Mutex::new(false),
working_dir: PathBuf::new(),
}
}
pub async fn spawn(
config: &ProposalSessionConfig,
working_dir: &Path,
) -> Result<Arc<Self>, AcpError> {
let resolved = resolve_command_path(&config.transport_command).await;
info!(
cmd = %config.transport_command,
resolved_cmd = %resolved.command,
args = ?config.transport_args,
cwd = %working_dir.display(),
"Spawning ACP subprocess"
);
let mut cmd = Command::new(&resolved.command);
cmd.args(&config.transport_args)
.current_dir(working_dir)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
if let Some(ref login_path) = resolved.login_shell_path {
cmd.env("PATH", login_path);
}
for (key, value) in &config.transport_env {
cmd.env(key, value);
}
let mut child = cmd.spawn().map_err(|e| AcpError::SpawnFailed {
command: resolved.command.clone(),
reason: e.to_string(),
})?;
let child_stdin = child.stdin.take().ok_or(AcpError::StdioPipeMissing)?;
let child_stdout = child.stdout.take().ok_or(AcpError::StdioPipeMissing)?;
let (stdin_tx, mut stdin_rx) = mpsc::channel::<String>(64);
let (notif_tx, notif_rx) = mpsc::channel::<AcpMessage>(256);
let mut writer = child_stdin;
tokio::spawn(async move {
while let Some(line) = stdin_rx.recv().await {
if let Err(e) = writer.write_all(line.as_bytes()).await {
error!(error = %e, "Failed to write to ACP stdin");
break;
}
if let Err(e) = writer.write_all(b"\n").await {
error!(error = %e, "Failed to write newline to ACP stdin");
break;
}
if let Err(e) = writer.flush().await {
error!(error = %e, "Failed to flush ACP stdin");
break;
}
}
debug!("ACP stdin writer task ended");
});
let pending_requests: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(Mutex::new(HashMap::new()));
let notif_tx_clone = notif_tx.clone();
let pending_requests_for_stdout = pending_requests.clone();
tokio::spawn(async move {
let reader = BufReader::new(child_stdout);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
debug!(line = %line, "ACP stdout");
let Ok(val) = serde_json::from_str::<Value>(&line) else {
debug!(line = %line, "Non-JSON line from ACP stdout");
continue;
};
let has_method = val.get("method").is_some();
let has_id = val.get("id").is_some();
if has_method {
if has_id {
warn!(
payload = %val,
"Received unsupported JSON-RPC server request on ACP stdout"
);
continue;
}
match serde_json::from_value::<JsonRpcNotification>(val) {
Ok(notif) => {
let msg = AcpMessage::Notification(notif);
if notif_tx_clone.send(msg).await.is_err() {
debug!("Notification channel closed");
break;
}
}
Err(e) => {
warn!(error = %e, "Failed to parse JSON-RPC notification");
}
}
continue;
}
if !dispatch_jsonrpc_response(&pending_requests_for_stdout, val).await {
break;
}
}
debug!("ACP stdout reader task ended");
});
if let Some(stderr) = child.stderr.take() {
tokio::spawn(async move {
let reader = BufReader::new(stderr);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
debug!(target: "acp_stderr", "{}", line);
}
});
}
let client = Arc::new(Self {
stdin_tx,
notification_tx: notif_tx.clone(),
notification_rx: Mutex::new(notif_rx),
next_id: AtomicU64::new(1),
pending_requests: pending_requests.clone(),
child: Mutex::new(Some(child)),
initialized: Mutex::new(false),
working_dir: working_dir.to_path_buf(),
});
Ok(client)
}
pub async fn initialize(&self) -> Result<Value, AcpError> {
let params = serde_json::json!({
"protocolVersion": 1,
"clientInfo": {
"name": "conflux-dashboard",
"version": env!("CARGO_PKG_VERSION")
},
"capabilities": {
"elicitation": {
"form": {}
}
}
});
let result = self.send_request("initialize", Some(params)).await?;
let mut init = self.initialized.lock().await;
*init = true;
info!("ACP initialize handshake completed");
Ok(result)
}
pub async fn create_session(&self) -> Result<String, AcpError> {
let cwd = self.working_dir.to_str().unwrap_or(".").to_string();
let result = self
.send_request(
"session/new",
Some(serde_json::json!({
"cwd": cwd,
"mcpServers": []
})),
)
.await?;
let session_id = result
.get("sessionId")
.and_then(|v| v.as_str())
.ok_or_else(|| AcpError::Protocol("session/new response missing 'sessionId'".into()))?
.to_string();
info!(acp_session_id = %session_id, "ACP session created");
Ok(session_id)
}
pub async fn send_prompt(
self: &Arc<Self>,
session_id: &str,
text: &str,
) -> Result<(), AcpError> {
self.send_prompt_with_prefix(session_id, &[], text).await
}
pub async fn send_prompt_with_prefix(
self: &Arc<Self>,
session_id: &str,
prefix_blocks: &[AcpPromptBlock],
text: &str,
) -> Result<(), AcpError> {
let mut prompt = Vec::with_capacity(prefix_blocks.len() + 1);
prompt.extend(prefix_blocks.iter().cloned());
prompt.push(AcpPromptBlock::text(text));
let params = serde_json::json!({
"sessionId": session_id,
"prompt": prompt
});
let client = Arc::clone(self);
let method = "session/prompt".to_string();
tokio::spawn(async move {
match client.send_request(&method, Some(params)).await {
Ok(_) => {
debug!("session/prompt completed");
}
Err(e) => warn!(error = %e, "session/prompt request failed"),
}
});
Ok(())
}
pub async fn cancel(&self, session_id: &str) -> Result<(), AcpError> {
self.send_notification(
"session/cancel",
Some(serde_json::json!({
"sessionId": session_id
})),
)
.await
}
pub async fn respond_elicitation(
&self,
request_id: &str,
action: &str,
content: Option<Value>,
) -> Result<(), AcpError> {
let mut params = serde_json::json!({
"requestId": request_id,
"action": action
});
if let Some(c) = content {
params["content"] = c;
}
let response = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
id: Some(Value::String(request_id.to_string())),
result: Some(params),
error: None,
};
let line = serde_json::to_string(&response)
.map_err(|e| AcpError::Protocol(format!("Failed to serialize response: {}", e)))?;
self.stdin_tx
.send(line)
.await
.map_err(|_| AcpError::ProcessExited)?;
Ok(())
}
pub async fn recv_notification(&self) -> Option<AcpMessage> {
let mut rx = self.notification_rx.lock().await;
let result = rx.recv().await;
if let Some(AcpMessage::Notification(ref notif)) = result {
debug!(method = %notif.method, "recv_notification received");
}
result
}
pub async fn kill(&self) {
let mut child = self.child.lock().await;
if let Some(ref mut c) = *child {
match c.kill().await {
Ok(()) => info!("ACP subprocess killed"),
Err(e) => warn!(error = %e, "Failed to kill ACP subprocess"),
}
}
*child = None;
let mut pending = self.pending_requests.lock().await;
pending.clear();
}
async fn send_request(&self, method: &str, params: Option<Value>) -> Result<Value, AcpError> {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let (response_tx, response_rx) = oneshot::channel::<JsonRpcResponse>();
{
let mut pending = self.pending_requests.lock().await;
pending.insert(id, response_tx);
}
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: Some(Value::Number(id.into())),
method: method.to_string(),
params,
};
let line = serde_json::to_string(&request)
.map_err(|e| AcpError::Protocol(format!("Failed to serialize request: {}", e)))?;
debug!(method = %method, id = %id, "Sending ACP request");
if self.stdin_tx.send(line).await.is_err() {
let mut pending = self.pending_requests.lock().await;
pending.remove(&id);
return Err(AcpError::ProcessExited);
}
let response =
match tokio::time::timeout(std::time::Duration::from_secs(30), response_rx).await {
Ok(Ok(resp)) => resp,
Ok(Err(_)) => {
return Err(AcpError::ProcessExited);
}
Err(_) => {
let mut pending = self.pending_requests.lock().await;
pending.remove(&id);
return Err(AcpError::Timeout {
method: method.to_string(),
});
}
};
if let Some(err) = response.error {
return Err(AcpError::RpcError {
code: err.code,
message: err.message,
});
}
Ok(response.result.unwrap_or(Value::Null))
}
async fn send_notification(&self, method: &str, params: Option<Value>) -> Result<(), AcpError> {
let notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
params,
};
let line = serde_json::to_string(¬ification)
.map_err(|e| AcpError::Protocol(format!("Failed to serialize notification: {}", e)))?;
debug!(method = %method, "Sending ACP notification");
self.stdin_tx
.send(line)
.await
.map_err(|_| AcpError::ProcessExited)?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum AcpError {
#[error("Failed to spawn ACP command '{command}': {reason}")]
SpawnFailed { command: String, reason: String },
#[error("ACP subprocess stdio pipe not available")]
StdioPipeMissing,
#[error("ACP subprocess has exited unexpectedly")]
ProcessExited,
#[error("ACP protocol error: {0}")]
Protocol(String),
#[error("ACP JSON-RPC error (code={code}): {message}")]
RpcError { code: i64, message: String },
#[error("Timeout waiting for ACP response to '{method}'")]
Timeout { method: String },
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_resolve_command_path_absolute_unchanged() {
let result = resolve_command_path("/usr/bin/echo").await;
assert_eq!(result.command, "/usr/bin/echo");
}
#[tokio::test]
async fn test_resolve_command_path_relative_resolves() {
let result = resolve_command_path("cat").await;
assert!(
result.command.starts_with('/'),
"Expected absolute path for 'cat', got: {}",
result.command
);
}
#[tokio::test]
async fn test_resolve_command_path_fallback() {
let result = resolve_command_path("nonexistent-binary-xyz-12345").await;
assert_eq!(result.command, "nonexistent-binary-xyz-12345");
}
#[test]
fn test_jsonrpc_request_serialization() {
let req = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: Some(Value::Number(1.into())),
method: "initialize".to_string(),
params: Some(serde_json::json!({"key": "value"})),
};
let json = serde_json::to_string(&req).unwrap();
assert!(json.contains("\"jsonrpc\":\"2.0\""));
assert!(json.contains("\"method\":\"initialize\""));
}
#[test]
fn test_jsonrpc_response_deserialization() {
let json = r#"{"jsonrpc":"2.0","id":1,"result":{"id":"session-123"}}"#;
let resp: JsonRpcResponse = serde_json::from_str(json).unwrap();
assert_eq!(resp.id, Some(Value::Number(1.into())));
assert!(resp.result.is_some());
assert!(resp.error.is_none());
}
#[test]
fn test_jsonrpc_error_response_deserialization() {
let json =
r#"{"jsonrpc":"2.0","id":2,"error":{"code":-32600,"message":"Invalid Request"}}"#;
let resp: JsonRpcResponse = serde_json::from_str(json).unwrap();
assert!(resp.error.is_some());
let err = resp.error.unwrap();
assert_eq!(err.code, -32600);
}
#[test]
fn test_jsonrpc_notification_serialization() {
let notif = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "session/update".to_string(),
params: Some(serde_json::json!({"type": "agent_message_chunk", "text": "hello"})),
};
let json = serde_json::to_string(¬if).unwrap();
assert!(json.contains("\"method\":\"session/update\""));
assert!(!json.contains("\"id\""));
}
#[test]
fn test_acp_event_deserialization() {
let json = r#"{"sessionUpdate": "agent_message_chunk", "content": {"type": "text", "text": "Hello"}}"#;
let event: AcpEvent = serde_json::from_str(json).unwrap();
match event {
AcpEvent::AgentMessageChunk { content } => {
assert_eq!(content.unwrap().text, "Hello");
}
_ => panic!("Expected AgentMessageChunk"),
}
}
#[test]
fn test_acp_event_tool_call() {
let json = r#"{"sessionUpdate": "tool_call", "toolCallId": "tc1", "title": "Read file", "kind": "read", "status": "pending"}"#;
let event: AcpEvent = serde_json::from_str(json).unwrap();
match event {
AcpEvent::ToolCall {
tool_call_id,
title,
..
} => {
assert_eq!(tool_call_id, "tc1");
assert_eq!(title, "Read file");
}
_ => panic!("Expected ToolCall"),
}
}
#[test]
fn test_acp_event_turn_complete() {
let json = r#"{"sessionUpdate": "turn_complete", "stopReason": "end_turn"}"#;
let event: AcpEvent = serde_json::from_str(json).unwrap();
match event {
AcpEvent::TurnComplete { stop_reason } => {
assert_eq!(stop_reason, "end_turn");
}
_ => panic!("Expected TurnComplete"),
}
}
#[test]
fn test_acp_event_elicitation() {
let json = r#"{"sessionUpdate": "elicitation", "requestId": "req1", "mode": "form", "message": "Choose option", "schema": {"type": "object"}}"#;
let event: AcpEvent = serde_json::from_str(json).unwrap();
match event {
AcpEvent::Elicitation {
request_id,
mode,
message,
schema,
} => {
assert_eq!(request_id, "req1");
assert_eq!(mode, "form");
assert_eq!(message, "Choose option");
assert!(schema.is_some());
}
_ => panic!("Expected Elicitation"),
}
}
#[tokio::test]
async fn dispatch_jsonrpc_response_routes_by_request_id() {
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(Mutex::new(HashMap::new()));
let (tx_1, rx_1) = oneshot::channel::<JsonRpcResponse>();
let (tx_2, rx_2) = oneshot::channel::<JsonRpcResponse>();
{
let mut guard = pending.lock().await;
guard.insert(1, tx_1);
guard.insert(2, tx_2);
}
let handled = dispatch_jsonrpc_response(
&pending,
serde_json::json!({"jsonrpc": "2.0", "id": 2, "result": {"ok": true}}),
)
.await;
assert!(handled);
assert!(
tokio::time::timeout(std::time::Duration::from_millis(50), rx_1)
.await
.is_err()
);
let response = tokio::time::timeout(std::time::Duration::from_millis(200), rx_2)
.await
.expect("response should be delivered")
.expect("waiter should stay alive");
assert_eq!(response.id, Some(Value::Number(2_u64.into())));
let guard = pending.lock().await;
assert!(guard.contains_key(&1));
assert!(!guard.contains_key(&2));
}
#[tokio::test]
async fn dispatch_jsonrpc_response_ignores_non_u64_id_without_consuming_waiters() {
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcResponse>>>> =
Arc::new(Mutex::new(HashMap::new()));
let (tx_1, rx_1) = oneshot::channel::<JsonRpcResponse>();
{
let mut guard = pending.lock().await;
guard.insert(1, tx_1);
}
let handled = dispatch_jsonrpc_response(
&pending,
serde_json::json!({"jsonrpc": "2.0", "id": "string-id", "result": {"ok": true}}),
)
.await;
assert!(handled);
assert!(
tokio::time::timeout(std::time::Duration::from_millis(50), rx_1)
.await
.is_err()
);
let guard = pending.lock().await;
assert!(guard.contains_key(&1));
}
}