mod actor;
mod events;
mod stream;
mod types;
pub use actor::AgentContext;
pub use events::{
CompressStrategy, NotifyChange, NotifyCompressedForReply, RequestAppend, RequestClear,
RequestCompress, RequestCompressed, RequestEstimateTokens, RequestExtend, RequestFindByRole,
RequestFromJsonl, RequestGet, RequestImmutable, RequestIncremental, RequestInsert,
RequestIsEmpty, RequestLen, RequestMessages, RequestPop, RequestRemove, RequestRetain,
RequestSend, RequestSendStream, RequestSubscribeChange, RequestSubscribeCompressed,
RequestToJsonl, RequestUnsubscribeChange, RequestUnsubscribeCompressed, RequestUpdate,
};
pub use stream::AgentSendStream;
pub use types::{
CommonOpts, ContextBackend, ContextBackendResponse, ResponseType, StreamEvent, ToolCallInfo,
};
#[cfg(test)]
mod tests {
use super::actor::AgentContext;
use super::events::{
CompressStrategy, NotifyChange, RequestAppend, RequestClear, RequestCompress,
RequestCompressed, RequestEstimateTokens, RequestExtend, RequestFindByRole,
RequestFromJsonl, RequestGet, RequestIncremental, RequestInsert, RequestIsEmpty,
RequestLen, RequestMessages, RequestPop, RequestRemove, RequestRetain, RequestSend,
RequestSendStream, RequestSubscribeChange, RequestToJsonl, RequestUpdate,
};
use super::types::{
CommonOpts, ContextBackend, ContextBackendResponse, ResponseType, ToolCallInfo,
};
use crate::error::AgentError;
use crate::message::ContextMessage;
use crate::role::Role;
use kameo::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MockMessage {
pub role: Role,
pub content: String,
}
impl ContextMessage for MockMessage {
fn role(&self) -> Role {
self.role
}
fn preserve_reasoning(&self) -> bool {
false
}
fn without_reasoning(self) -> Self {
self
}
fn with_role(mut self, role: Role) -> Self {
self.role = role;
self
}
}
impl ContextBackendResponse for Vec<MockMessage> {
fn response_type(&self) -> ResponseType {
let content: String = self.iter().map(|m| m.content.as_str()).collect();
if content.is_empty() {
ResponseType::Empty
} else {
ResponseType::Content
}
}
fn reasoning_content(&self) -> Option<String> {
None
}
fn content(&self) -> Option<String> {
let text: String = self
.iter()
.filter(|m| !m.content.is_empty())
.map(|m| m.content.as_str())
.collect();
if text.is_empty() { None } else { Some(text) }
}
fn tool_calls(&self) -> Vec<ToolCallInfo> {
Vec::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MockOpts {
pub common: CommonOpts,
pub token: u64,
}
impl std::convert::AsRef<CommonOpts> for MockOpts {
fn as_ref(&self) -> &CommonOpts {
&self.common
}
}
fn mock_opts(token: u64) -> MockOpts {
MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: None,
},
token,
}
}
#[derive(Clone)]
pub struct MockBackend {
pub send_response: Vec<MockMessage>,
pub token_count: usize,
pub last_opts_token: Arc<std::sync::atomic::AtomicU64>,
pub last_sent_count: Arc<std::sync::atomic::AtomicUsize>,
}
impl ContextBackend for MockBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn user_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::User,
content: content.into(),
}
}
fn system_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::System,
content: content.into(),
}
}
fn tool_message(
&self,
tool_call_id: impl Into<String> + Send,
content: impl Into<String> + Send,
) -> Self::Message {
let id = tool_call_id.into();
assert!(!id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: content.into(),
}
}
fn extract_messages(
&self,
responses: &[Self::Response],
) -> Result<Vec<Self::Message>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn merge_chunks(&self, _responses: &[Self::Response]) -> Option<Self::Message> {
None
}
async fn estimate_tokens(&self, _messages: &[Self::Message]) -> Result<usize, AgentError> {
Ok(self.token_count)
}
async fn send(
&self,
messages: &[Self::Message],
opts: &Self::Opts,
) -> Result<Self::Response, AgentError> {
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
self.last_sent_count
.store(messages.len(), std::sync::atomic::Ordering::SeqCst);
Ok(self.send_response.clone())
}
fn send_stream(
&self,
_messages: Vec<Self::Message>,
opts: Self::Opts,
) -> impl futures_core::Stream<Item = Result<Self::Response, AgentError>> + Send + 'static
{
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
let responses = self.send_response.clone();
futures::stream::iter(responses.into_iter().map(|m| Ok(vec![m])))
}
}
fn spawn_actor(immutable: Vec<MockMessage>) -> ActorRef<AgentContext<MockBackend>> {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, immutable);
AgentContext::spawn(ctx)
}
#[tokio::test]
async fn new_creates_empty_zones() {
let actor = spawn_actor(vec![]);
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 0);
let empty: bool = actor.ask(RequestIsEmpty).await.unwrap();
assert!(empty);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn new_with_immutable() {
let msgs = vec![MockMessage {
role: Role::System,
content: "system".into(),
}];
let actor = spawn_actor(msgs);
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
let empty: bool = actor.ask(RequestIsEmpty).await.unwrap();
assert!(!empty);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_on_change() {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Actor)]
struct ChangeCounter {
count: Arc<AtomicUsize>,
}
impl Message<NotifyChange<MockMessage>> for ChangeCounter {
type Reply = ();
async fn handle(
&mut self,
_change: NotifyChange<MockMessage>,
_ctx: &mut Context<Self, ()>,
) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
let count = Arc::new(AtomicUsize::new(0));
let counter = ChangeCounter::spawn(ChangeCounter {
count: count.clone(),
});
let recipient = counter.recipient::<NotifyChange<MockMessage>>();
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(RequestSubscribeChange { recipient })
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(count.load(Ordering::SeqCst), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_get() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
let got: Option<MockMessage> = actor.ask(RequestGet(0)).await.unwrap();
assert!(got.is_some());
assert_eq!(got.unwrap().content, "hi");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn extend_multiple() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestExtend {
messages: vec![
MockMessage {
role: Role::User,
content: "a".into(),
},
MockMessage {
role: Role::Assistant,
content: "b".into(),
},
],
})
.await
.unwrap();
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn get_crosses_zones() {
let immutable_msg = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor = spawn_actor(vec![immutable_msg]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "user".into(),
},
})
.await
.unwrap();
let sys: Option<MockMessage> = actor.ask(RequestGet(0)).await.unwrap();
assert_eq!(sys.unwrap().content, "sys");
let user: Option<MockMessage> = actor.ask(RequestGet(1)).await.unwrap();
assert_eq!(user.unwrap().content, "user");
let none: Option<MockMessage> = actor.ask(RequestGet(99)).await.unwrap();
assert!(none.is_none());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn messages_returns_all_zones() {
let sys = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor = spawn_actor(vec![sys]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor.ask(RequestMessages).await.unwrap();
assert_eq!(all.len(), 2);
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn incremental_readonly() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn find_by_role() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u1".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::Assistant,
content: "a1".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
let users: Vec<MockMessage> = actor.ask(RequestFindByRole(Role::User)).await.unwrap();
assert_eq!(users.len(), 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_message() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "old".into(),
},
})
.await
.unwrap();
actor
.ask(RequestUpdate {
index: 0,
message: MockMessage {
role: Role::User,
content: "new".into(),
},
})
.await
.unwrap();
let msg: Option<MockMessage> = actor.ask(RequestGet(0)).await.unwrap();
assert_eq!(msg.unwrap().content, "new");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_out_of_bounds() {
let actor = spawn_actor(vec![]);
let result = actor
.ask(RequestUpdate {
index: 0,
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await;
assert!(result.is_err());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn insert_message() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(RequestInsert {
index: 0,
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(all[0].content, "b");
assert_eq!(all[1].content, "a");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn remove_message() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
actor.ask(RequestRemove { index: 0 }).await.unwrap();
let all: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].content, "b");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn pop_message() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let popped: Option<MockMessage> = actor.ask(RequestPop).await.unwrap();
assert_eq!(popped.unwrap().content, "b");
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn retain_by_role() {
let actor = spawn_actor(vec![]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::Assistant,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
actor.ask(RequestRetain { role: Role::User }).await.unwrap();
let all: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(all.len(), 2);
assert!(all.iter().all(|m| m.role() == Role::User));
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_incremental() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor.ask(RequestClear).await.unwrap();
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_empty_is_noop() {
let actor = spawn_actor(vec![]);
actor.ask(RequestClear).await.unwrap();
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_summarize() {
let summary_reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![summary_reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
for i in 0..5 {
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: mock_opts(0),
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
let comp: Vec<MockMessage> = actor.ask(RequestCompressed).await.unwrap();
assert_eq!(comp.len(), 1);
assert_eq!(comp[0].role, Role::System);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_noop_when_below_keep() {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "only".into(),
},
})
.await
.unwrap();
actor
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 5,
prompt: None,
},
opts: mock_opts(0),
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_appends_to_incremental() {
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let raw = actor.ask(RequestSend { opts: mock_opts(0) }).await.unwrap();
assert_eq!(raw[0].content, "reply");
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
assert_eq!(inc[0].content, "reply");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_collects_and_stores() {
let chunks = vec![
MockMessage {
role: Role::Assistant,
content: "chunk1".into(),
},
MockMessage {
role: Role::Assistant,
content: "chunk2".into(),
},
];
let backend = MockBackend {
send_response: chunks,
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor
.ask(RequestSendStream { opts: mock_opts(0) })
.await
.unwrap();
let mut count = 0usize;
while stream.next().await.is_some() {
count += 1;
}
assert!(count > 0, "Stream should have at least one item");
for chunk in stream.take_chunks() {
for msg in chunk {
actor.ask(RequestAppend { message: msg }).await.unwrap();
}
}
}
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_delegates() {
let backend = MockBackend {
send_response: vec![],
token_count: 42,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let tokens: usize = actor.ask(RequestEstimateTokens).await.unwrap();
assert_eq!(tokens, 42);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
let response = actor
.ask(RequestSend {
opts: mock_opts(42),
})
.await
.unwrap();
assert_eq!(
response.len(),
0,
"Empty response expected for opts passthrough test"
);
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
42,
"RequestSend 应将 opts 透传给 backend.send()"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor
.ask(RequestSendStream { opts: mock_opts(7) })
.await
.unwrap();
let mut count = 0usize;
while stream.next().await.is_some() {
count += 1;
}
assert_eq!(count, 0, "Empty stream expected for opts passthrough test");
}
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
7,
"RequestSendStream 应将 opts 透传给 backend.send_stream()"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_opts_passthrough() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
for i in 0..5 {
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: mock_opts(99),
})
.await
.unwrap();
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
99,
"RequestCompress 应将 opts 透传给 backend.send()"
);
actor.stop_gracefully().await.unwrap();
}
#[derive(Clone)]
struct ExtractErrorBackend;
impl ContextBackend for ExtractErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn extract_messages(&self, _: &[Self::Response]) -> Result<Vec<Self::Message>, AgentError> {
Err(AgentError::Context("模拟提取失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct ToRequestErrorBackend;
impl ContextBackend for ToRequestErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn to_request_messages(&self, _: Vec<MockMessage>) -> Result<Vec<MockMessage>, AgentError> {
Err(AgentError::Context("模拟转换失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct EstimateErrorBackend;
impl ContextBackend for EstimateErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Err(AgentError::Context("模拟估算失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[tokio::test]
async fn send_extract_error_propagates() {
let ctx = AgentContext::new(ExtractErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let result = actor.ask(RequestSend { opts: mock_opts(0) }).await;
assert!(result.is_err(), "extract 失败应传播错误");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_to_request_error_propagates() {
let ctx = AgentContext::new(ToRequestErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let result = actor.ask(RequestSend { opts: mock_opts(0) }).await;
assert!(result.is_err(), "to_request 失败应传播错误");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_error_returns_zero() {
let ctx = AgentContext::new(EstimateErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let tokens: usize = actor.ask(RequestEstimateTokens).await.unwrap();
assert_eq!(tokens, 0, "估算失败应降级返回 0");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_jsonl_exports_all_messages() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let jsonl: String = actor.ask(RequestToJsonl).await.unwrap();
let lines: Vec<&str> = jsonl.lines().collect();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("sys"));
assert!(lines[1].contains("hello"));
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_loads_messages() {
let actor = spawn_actor(vec![]);
let jsonl = r#"{"role":"User","content":"hello"}
{"role":"Assistant","content":"hi there"}"#;
actor
.ask(RequestFromJsonl {
jsonl: jsonl.into(),
})
.await
.unwrap();
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(all[0].content, "hello");
assert_eq!(all[1].content, "hi there");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_skips_empty_lines() {
let actor = spawn_actor(vec![]);
let jsonl = "\n\n{\"role\":\"User\",\"content\":\"only\"}\n\n";
actor
.ask(RequestFromJsonl {
jsonl: jsonl.into(),
})
.await
.unwrap();
let len: usize = actor.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_invalid_returns_error() {
let actor = spawn_actor(vec![]);
let result = actor
.ask(RequestFromJsonl {
jsonl: "not json".into(),
})
.await;
assert!(result.is_err());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_jsonl_then_from_jsonl_roundtrip() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let jsonl: String = actor.ask(RequestToJsonl).await.unwrap();
assert!(!jsonl.is_empty());
let actor2 = spawn_actor(vec![]);
actor2.ask(RequestFromJsonl { jsonl }).await.unwrap();
let len: usize = actor2.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = actor2.ask(RequestMessages).await.unwrap();
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
actor2.stop_gracefully().await.unwrap();
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_with_scratch_appends_to_sent_messages() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let response = actor
.ask(RequestSend {
opts: MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: Some("当前时间: 2026-05-28 20:00".into()),
},
token: 0,
},
})
.await
.unwrap();
assert_eq!(response.len(), 1, "Should have one response message");
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
2,
"scratch 应作为额外 system 消息追加"
);
let inc: Vec<MockMessage> = actor.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
assert_eq!(inc[0].role, Role::User);
assert_eq!(inc[1].role, Role::Assistant);
assert_eq!(inc[1].content, "reply");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_without_scratch_is_unchanged() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let response = actor.ask(RequestSend { opts: mock_opts(0) }).await.unwrap();
assert_eq!(response.len(), 1, "Should have one response message");
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
1,
"scratch 为 None 时不追加额外消息"
);
actor.stop_gracefully().await.unwrap();
}
}