use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use agent_context::{
AgentContext, AppendMsg, ChangeEvent, ContextBackend, ContextBackendResponse, ContextMessage,
MessagesMsg, ResponseType, SendMsg, SendStreamMsg,
};
use agent_context::{AgentError, Role};
use kameo::prelude::*;
use serde::{Deserialize, Serialize};
struct EventCollector {
slots: Box<[std::cell::UnsafeCell<Option<String>>]>,
len: AtomicUsize,
}
unsafe impl Send for EventCollector {}
unsafe impl Sync for EventCollector {}
impl EventCollector {
fn new(capacity: usize) -> Self {
let slots = (0..capacity)
.map(|_| std::cell::UnsafeCell::new(None))
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
slots,
len: AtomicUsize::new(0),
}
}
fn push(&self, value: String) {
let idx = self.len.fetch_add(1, Ordering::Relaxed);
if idx < self.slots.len() {
unsafe {
*self.slots[idx].get() = Some(value);
}
}
}
fn collect(&self) -> Vec<String> {
let len = self.len.load(Ordering::Relaxed);
(0..len)
.filter_map(|i| unsafe { (*self.slots[i].get()).take() })
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ToolCall {
id: String,
function_name: String,
arguments: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RichMockMessage {
role: Role,
content: String,
reasoning_content: Option<String>,
tool_calls: Option<Vec<ToolCall>>,
tool_call_id: Option<String>,
}
impl RichMockMessage {
fn user(content: impl Into<String>) -> Self {
Self {
role: Role::User,
content: content.into(),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
}
}
fn system(content: impl Into<String>) -> Self {
Self {
role: Role::System,
content: content.into(),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
}
}
fn assistant(content: impl Into<String>) -> Self {
Self {
role: Role::Assistant,
content: content.into(),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
}
}
fn assistant_with_reasoning(content: impl Into<String>, reasoning: impl Into<String>) -> Self {
Self {
role: Role::Assistant,
content: content.into(),
reasoning_content: Some(reasoning.into()),
tool_calls: None,
tool_call_id: None,
}
}
fn assistant_with_tool_calls(
content: impl Into<String>,
reasoning: impl Into<String>,
tool_calls: Vec<ToolCall>,
) -> Self {
Self {
role: Role::Assistant,
content: content.into(),
reasoning_content: Some(reasoning.into()),
tool_calls: Some(tool_calls),
tool_call_id: None,
}
}
fn tool_result(tool_call_id: impl Into<String>, content: impl Into<String>) -> Self {
Self {
role: Role::Tool,
content: content.into(),
reasoning_content: None,
tool_calls: None,
tool_call_id: Some(tool_call_id.into()),
}
}
}
impl ContextMessage for RichMockMessage {
fn role(&self) -> Role {
self.role
}
fn preserve_reasoning(&self) -> bool {
self.role == Role::Assistant && self.tool_calls.is_some()
}
fn without_reasoning(mut self) -> Self {
self.reasoning_content = None;
self
}
fn with_role(mut self, role: Role) -> Self {
self.role = role;
self
}
}
#[derive(Clone, Default)]
pub struct RichMockOpts {
pub token: u64,
pub scratch: Option<String>,
}
impl agent_context::ScratchOpts for RichMockOpts {
fn scratch(&self) -> Option<&str> {
self.scratch.as_deref()
}
}
#[derive(Clone)]
struct RichMockBackend {
send_responses: Vec<Vec<RichMockMessage>>,
call_index: Arc<std::sync::atomic::AtomicUsize>,
token_count: usize,
last_opts_token: Arc<std::sync::atomic::AtomicU64>,
}
impl RichMockBackend {
fn new(responses: Vec<Vec<RichMockMessage>>) -> Self {
Self {
send_responses: responses,
call_index: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
}
#[derive(Debug, Clone)]
struct RichMockResponse(Vec<RichMockMessage>);
impl ContextBackendResponse for RichMockResponse {
fn response_type(&self) -> ResponseType {
let has_r = self
.0
.iter()
.any(|m| m.reasoning_content.as_ref().is_some_and(|r| !r.is_empty()));
let has_c = self.0.iter().any(|m| !m.content.is_empty());
match (has_r, has_c) {
(false, false) => ResponseType::Empty,
(true, false) => ResponseType::Reasoning,
(false, true) => ResponseType::Content,
(true, true) => ResponseType::ReasoningAndContent,
}
}
fn reasoning_content(&self) -> Option<String> {
let text: String = self
.0
.iter()
.filter_map(|m| m.reasoning_content.clone())
.collect();
if text.is_empty() { None } else { Some(text) }
}
fn content(&self) -> Option<String> {
let text: String = self.0.iter().map(|m| m.content.as_str()).collect();
if text.is_empty() { None } else { Some(text) }
}
fn tool_calls(&self) -> Vec<agent_context::ToolCallInfo> {
self.0
.iter()
.filter_map(|m| m.tool_calls.as_ref())
.flatten()
.map(|tc| agent_context::ToolCallInfo {
id: tc.id.clone(),
name: tc.function_name.clone(),
arguments: tc.arguments.clone(),
})
.collect()
}
}
impl ContextBackend for RichMockBackend {
type Message = RichMockMessage;
type Opts = RichMockOpts;
type Response = RichMockResponse;
fn user_message(&self, content: impl Into<String> + Send) -> Self::Message {
RichMockMessage::user(content)
}
fn system_message(&self, content: impl Into<String> + Send) -> Self::Message {
RichMockMessage::system(content)
}
fn tool_message(
&self,
tool_call_id: impl Into<String> + Send,
content: impl Into<String> + Send,
) -> Self::Message {
RichMockMessage::tool_result(tool_call_id, content)
}
fn extract_messages_from_backend_response(
&self,
responses: &[Self::Response],
) -> Result<Vec<Self::Message>, AgentError> {
Ok(responses.iter().flat_map(|r| r.0.iter().cloned()).collect())
}
fn merge_chunks(&self, _responses: &[Self::Response]) -> Option<Self::Message> {
None
}
fn context_window(&self) -> usize {
10000
}
async fn estimate_tokens(&self, _messages: &[Self::Message]) -> Result<usize, AgentError> {
Ok(self.token_count)
}
async fn send(
&self,
_messages: &[Self::Message],
opts: &Self::Opts,
) -> Result<Self::Response, AgentError> {
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
let idx = self
.call_index
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let responses = &self.send_responses;
if idx < responses.len() {
Ok(RichMockResponse(responses[idx].clone()))
} else {
Ok(RichMockResponse(vec![]))
}
}
fn send_stream(
&self,
_messages: Vec<Self::Message>,
opts: Self::Opts,
) -> impl futures_core::Stream<Item = Result<Self::Response, AgentError>> + Send + 'static {
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
let idx = self
.call_index
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let responses = &self.send_responses;
let chunks = if idx < responses.len() {
responses[idx].clone()
} else {
vec![]
};
futures::stream::iter(chunks.into_iter().map(|m| Ok(RichMockResponse(vec![m]))))
}
}
fn spawn_with_backend(backend: RichMockBackend) -> ActorRef<AgentContext<RichMockBackend>> {
let ctx = AgentContext::new(backend, vec![]);
AgentContext::spawn(ctx)
}
fn spawn_with_on_change(
backend: RichMockBackend,
on_change: impl Fn(ChangeEvent<RichMockMessage>) + Send + Sync + 'static,
) -> ActorRef<AgentContext<RichMockBackend>> {
let ctx = AgentContext::new(backend, vec![]).with_on_change(on_change);
AgentContext::spawn(ctx)
}
#[tokio::test]
async fn basic_conversation() {
let backend = RichMockBackend::new(vec![vec![RichMockMessage::assistant("Hello!")]]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("Hi"),
})
.await
.unwrap();
let raw: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(raw.0[0].content, "Hello!");
let all: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert_eq!(all.len(), 2);
assert_eq!(all[0].role, Role::User);
assert_eq!(all[1].role, Role::Assistant);
assert_eq!(all[1].content, "Hello!");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn reasoning_stripped_without_tool_call() {
let backend = RichMockBackend::new(vec![vec![RichMockMessage::assistant_with_reasoning(
"答案是42",
"让我想想...",
)]]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("问题"),
})
.await
.unwrap();
let raw: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert!(
raw.0[0].reasoning_content.is_some(),
"原始响应应含 reasoning"
);
let all: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
let stored_assistant = &all[1];
assert_eq!(stored_assistant.role, Role::Assistant);
assert!(
stored_assistant.reasoning_content.is_none(),
"非工具调用场景应剥离 reasoning_content"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn reasoning_preserved_with_tool_call() {
let backend = RichMockBackend::new(vec![vec![RichMockMessage::assistant_with_tool_calls(
"",
"需要调用工具",
vec![ToolCall {
id: "call_1".into(),
function_name: "get_weather".into(),
arguments: r#"{"city":"Beijing"}"#.into(),
}],
)]]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("北京天气"),
})
.await
.unwrap();
let _raw: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
let all: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
let stored_assistant = &all[1];
assert_eq!(stored_assistant.role, Role::Assistant);
assert!(
stored_assistant.reasoning_content.is_some(),
"工具调用场景应保留 reasoning_content"
);
assert!(stored_assistant.tool_calls.is_some());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn full_tool_call_cycle() {
let backend = RichMockBackend::new(vec![
vec![RichMockMessage::assistant_with_tool_calls(
"",
"需要查询天气",
vec![ToolCall {
id: "call_1".into(),
function_name: "get_weather".into(),
arguments: r#"{"city":"Beijing"}"#.into(),
}],
)],
vec![RichMockMessage::assistant_with_reasoning(
"北京今天晴,25°C",
"根据工具返回的数据分析",
)],
]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("北京天气怎么样?"),
})
.await
.unwrap();
let raw1: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert!(raw1.0[0].tool_calls.is_some());
let after_r1: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert_eq!(after_r1.len(), 2); assert!(
after_r1[1].reasoning_content.is_some(),
"工具调用 assistant 保留 reasoning"
);
actor
.ask(AppendMsg {
message: RichMockMessage::tool_result("call_1", r#"{"temp":25,"condition":"sunny"}"#),
})
.await
.unwrap();
let raw2: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert!(
raw2.0[0].reasoning_content.is_some(),
"原始响应含 reasoning"
);
let final_msgs: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert_eq!(final_msgs.len(), 4);
assert_eq!(final_msgs[0].role, Role::User);
assert_eq!(final_msgs[1].role, Role::Assistant);
assert!(final_msgs[1].tool_calls.is_some());
assert!(
final_msgs[1].reasoning_content.is_some(),
"工具调用 assistant 保留 reasoning"
);
assert_eq!(final_msgs[2].role, Role::Tool);
assert_eq!(final_msgs[2].tool_call_id.as_deref(), Some("call_1"));
assert_eq!(final_msgs[3].role, Role::Assistant);
assert!(
final_msgs[3].reasoning_content.is_none(),
"非工具调用 assistant 应剥离 reasoning"
);
assert_eq!(final_msgs[3].content, "北京今天晴,25°C");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_returns_raw_to_caller() {
let backend = RichMockBackend::new(vec![vec![RichMockMessage::assistant_with_reasoning(
"答案",
"思考过程",
)]]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("问"),
})
.await
.unwrap();
let raw: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(raw.0[0].content, "答案");
assert!(
raw.0[0].reasoning_content.is_some(),
"调用方应拿到原始响应(含 reasoning)"
);
let stored: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert!(stored[1].reasoning_content.is_none(), "AC 内部存储应已剥离");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_tool_call() {
let backend = RichMockBackend::new(vec![vec![RichMockMessage::assistant_with_tool_calls(
"",
"调用工具",
vec![ToolCall {
id: "call_s".into(),
function_name: "search".into(),
arguments: r#"{"q":"test"}"#.into(),
}],
)]]);
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("搜索"),
})
.await
.unwrap();
{
use futures::StreamExt;
let mut stream = actor
.ask(SendStreamMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
while stream.next().await.is_some() {}
}
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let stored: Vec<RichMockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert_eq!(stored.len(), 2);
assert!(stored[1].tool_calls.is_some());
assert!(
stored[1].reasoning_content.is_some(),
"流式工具调用应保留 reasoning_content"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn on_change_event_sequence() {
let events = Arc::new(EventCollector::new(8));
let events_clone = events.clone();
let backend = RichMockBackend::new(vec![
vec![RichMockMessage::assistant("R1")],
vec![RichMockMessage::assistant("R2")],
]);
let actor = spawn_with_on_change(backend, move |event: ChangeEvent<RichMockMessage>| {
let label = match &event {
ChangeEvent::Appended(m) => format!("Appended({})", m.content),
ChangeEvent::Updated { index, old, new } => {
format!("Updated({}, {}→{})", index, old.content, new.content)
}
ChangeEvent::Removed { index, .. } => format!("Removed({})", index),
ChangeEvent::Cleared { .. } => "Cleared".into(),
_ => "Other".into(),
};
events_clone.push(label);
});
actor
.ask(AppendMsg {
message: RichMockMessage::user("Q1"),
})
.await
.unwrap();
let _: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: RichMockMessage::user("Q2"),
})
.await
.unwrap();
let _: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
let captured = events.collect();
assert_eq!(captured.len(), 4, "应有 4 个 Appended 事件");
assert_eq!(captured[0], "Appended(Q1)");
assert_eq!(captured[1], "Appended(R1)");
assert_eq!(captured[2], "Appended(Q2)");
assert_eq!(captured[3], "Appended(R2)");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn on_change_tracks_tool_call_cycle() {
let events = Arc::new(EventCollector::new(8));
let events_clone = events.clone();
let backend = RichMockBackend::new(vec![
vec![RichMockMessage::assistant_with_tool_calls(
"",
"调用工具",
vec![ToolCall {
id: "tc1".into(),
function_name: "fn".into(),
arguments: "{}".into(),
}],
)],
vec![RichMockMessage::assistant("Done")],
]);
let actor = spawn_with_on_change(backend, move |event: ChangeEvent<RichMockMessage>| {
if let ChangeEvent::Appended(m) = &event {
let label = match m.role() {
Role::User => format!("User({})", m.content),
Role::Assistant => {
let has_tc = if m.tool_calls.is_some() { "+tc" } else { "" };
format!("Assistant({}){}", m.content, has_tc)
}
Role::Tool => format!("Tool({})", m.content),
Role::System => format!("System({})", m.content),
};
events_clone.push(label);
}
});
actor
.ask(AppendMsg {
message: RichMockMessage::user("Q"),
})
.await
.unwrap();
let _: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: RichMockMessage::tool_result("tc1", "result"),
})
.await
.unwrap();
let _: RichMockResponse = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
let captured = events.collect();
assert_eq!(
captured.len(),
4,
"应有 4 个 Appended 事件:user, assistant+tc, tool, assistant"
);
assert_eq!(captured[0], "User(Q)");
assert_eq!(captured[1], "Assistant()+tc");
assert_eq!(captured[2], "Tool(result)");
assert_eq!(captured[3], "Assistant(Done)");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn opts_passthrough_to_backend() {
let last_opts_token = Arc::new(std::sync::atomic::AtomicU64::new(0));
let mut backend = RichMockBackend::new(vec![
vec![RichMockMessage::assistant("R1")],
vec![RichMockMessage::assistant("R2")],
]);
backend.last_opts_token = last_opts_token.clone();
let actor = spawn_with_backend(backend);
actor
.ask(AppendMsg {
message: RichMockMessage::user("Q1"),
})
.await
.unwrap();
let _ = actor
.ask(SendMsg {
opts: RichMockOpts {
token: 100,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(
last_opts_token.load(Ordering::SeqCst),
100,
"Send 的 opts 应透传到 backend.send()"
);
let _ = actor
.ask(SendStreamMsg {
opts: RichMockOpts {
token: 200,
..Default::default()
},
})
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert_eq!(
last_opts_token.load(Ordering::SeqCst),
200,
"SendStream 的 opts 应透传到 backend.send_stream()"
);
actor.stop_gracefully().await.unwrap();
}