mod actor;
mod event;
mod stream;
mod types;
pub use actor::{
AgentContext, AppendMsg, ClearMsg, CompressMsg, CompressedMsg, EstimateTokensMsg, ExtendMsg,
FindByRoleMsg, FromJsonlMsg, Get, ImmutableMsg, IncrementalMsg, InsertMsg, IsEmpty, IsFullMsg,
Len, MessagesMsg, PopMsg, RemoveMsg, RetainMsg, SendMsg, SendStreamMsg, SilentAppendMsg,
ToJsonlMsg, UpdateMsg,
};
pub use event::{ChangeEvent, CompressStrategy};
pub use stream::AgentSendStream;
pub use types::{
ContextBackend, ContextBackendResponse, ResponseType, ScratchOpts, StreamEvent, ToolCallInfo,
};
#[cfg(test)]
mod tests {
use super::actor::{
AgentContext, AppendMsg, ClearMsg, CompressMsg, CompressedMsg, EstimateTokensMsg,
ExtendMsg, FindByRoleMsg, FromJsonlMsg, Get, IncrementalMsg, InsertMsg, IsEmpty, IsFullMsg,
Len, MessagesMsg, PopMsg, RemoveMsg, RetainMsg, SendMsg, SendStreamMsg, SilentAppendMsg,
ToJsonlMsg, UpdateMsg,
};
use super::event::CompressStrategy;
use super::types::{ContextBackend, ContextBackendResponse, ResponseType, ToolCallInfo};
use crate::error::AgentError;
use crate::message::ContextMessage;
use crate::role::Role;
use kameo::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MockMessage {
pub role: Role,
pub content: String,
}
impl ContextMessage for MockMessage {
fn role(&self) -> Role {
self.role
}
fn preserve_reasoning(&self) -> bool {
false
}
fn without_reasoning(self) -> Self {
self
}
fn with_role(mut self, role: Role) -> Self {
self.role = role;
self
}
}
impl ContextBackendResponse for Vec<MockMessage> {
fn response_type(&self) -> ResponseType {
let content: String = self.iter().map(|m| m.content.as_str()).collect();
if content.is_empty() {
ResponseType::Empty
} else {
ResponseType::Content
}
}
fn reasoning_content(&self) -> Option<String> {
None
}
fn content(&self) -> Option<String> {
let text: String = self
.iter()
.filter(|m| !m.content.is_empty())
.map(|m| m.content.as_str())
.collect();
if text.is_empty() { None } else { Some(text) }
}
fn tool_calls(&self) -> Vec<ToolCallInfo> {
Vec::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct MockOpts {
pub token: u64,
pub scratch: Option<String>,
}
impl super::types::ScratchOpts for MockOpts {
fn scratch(&self) -> Option<&str> {
self.scratch.as_deref()
}
}
#[derive(Clone)]
pub struct MockBackend {
pub send_response: Vec<MockMessage>,
pub token_count: usize,
pub last_opts_token: Arc<std::sync::atomic::AtomicU64>,
pub last_sent_count: Arc<std::sync::atomic::AtomicUsize>,
}
impl ContextBackend for MockBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn user_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::User,
content: content.into(),
}
}
fn system_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::System,
content: content.into(),
}
}
fn tool_message(
&self,
tool_call_id: impl Into<String> + Send,
content: impl Into<String> + Send,
) -> Self::Message {
let _ = tool_call_id;
MockMessage {
role: Role::Tool,
content: content.into(),
}
}
fn extract_messages_from_backend_response(
&self,
responses: &[Self::Response],
) -> Result<Vec<Self::Message>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn merge_chunks(&self, _responses: &[Self::Response]) -> Option<Self::Message> {
None
}
fn context_window(&self) -> usize {
1000
}
async fn estimate_tokens(&self, _messages: &[Self::Message]) -> Result<usize, AgentError> {
Ok(self.token_count)
}
async fn send(
&self,
messages: &[Self::Message],
opts: &Self::Opts,
) -> Result<Self::Response, AgentError> {
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
self.last_sent_count
.store(messages.len(), std::sync::atomic::Ordering::SeqCst);
Ok(self.send_response.clone())
}
fn send_stream(
&self,
_messages: Vec<Self::Message>,
opts: Self::Opts,
) -> impl futures_core::Stream<Item = Result<Self::Response, AgentError>> + Send + 'static
{
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
let responses = self.send_response.clone();
futures::stream::iter(responses.into_iter().map(|m| Ok(vec![m])))
}
}
fn spawn_actor(immutable: Vec<MockMessage>) -> ActorRef<AgentContext<MockBackend>> {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, immutable);
AgentContext::spawn(ctx)
}
#[tokio::test]
async fn new_creates_empty_zones() {
let actor = spawn_actor(vec![]);
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 0);
let empty: bool = actor.ask(IsEmpty).await.unwrap();
assert!(empty);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn new_with_immutable() {
let msgs = vec![MockMessage {
role: Role::System,
content: "system".into(),
}];
let actor = spawn_actor(msgs);
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 1);
let empty: bool = actor.ask(IsEmpty).await.unwrap();
assert!(!empty);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_on_change() {
use std::sync::atomic::{AtomicUsize, Ordering};
let count = Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]).with_on_change(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
let actor = AgentContext::spawn(ctx);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_get() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
let got: Option<MockMessage> = actor.ask(Get(0)).await.unwrap();
assert!(got.is_some());
assert_eq!(got.unwrap().content, "hi");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn extend_multiple() {
let actor = spawn_actor(vec![]);
actor
.ask(ExtendMsg {
messages: vec![
MockMessage {
role: Role::User,
content: "a".into(),
},
MockMessage {
role: Role::Assistant,
content: "b".into(),
},
],
})
.await
.unwrap();
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn get_crosses_zones() {
let immutable_msg = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor = spawn_actor(vec![immutable_msg]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "user".into(),
},
})
.await
.unwrap();
let sys: Option<MockMessage> = actor.ask(Get(0)).await.unwrap();
assert_eq!(sys.unwrap().content, "sys");
let user: Option<MockMessage> = actor.ask(Get(1)).await.unwrap();
assert_eq!(user.unwrap().content, "user");
let none: Option<MockMessage> = actor.ask(Get(99)).await.unwrap();
assert!(none.is_none());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn messages_returns_all_zones() {
let sys = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor = spawn_actor(vec![sys]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor.ask(MessagesMsg).await.unwrap();
assert_eq!(all.len(), 2);
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn incremental_readonly() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn find_by_role() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u1".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::Assistant,
content: "a1".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
let users: Vec<MockMessage> = actor.ask(FindByRoleMsg(Role::User)).await.unwrap();
assert_eq!(users.len(), 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn silent_append_no_on_change() {
use std::sync::atomic::{AtomicUsize, Ordering};
let count = Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]).with_on_change(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
let actor = AgentContext::spawn(ctx);
actor
.ask(SilentAppendMsg {
message: MockMessage {
role: Role::User,
content: "silent".into(),
},
})
.await
.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 0);
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_message() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "old".into(),
},
})
.await
.unwrap();
actor
.ask(UpdateMsg {
index: 0,
message: MockMessage {
role: Role::User,
content: "new".into(),
},
})
.await
.unwrap();
let msg: Option<MockMessage> = actor.ask(Get(0)).await.unwrap();
assert_eq!(msg.unwrap().content, "new");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_out_of_bounds() {
let actor = spawn_actor(vec![]);
let result = actor
.ask(UpdateMsg {
index: 0,
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await;
assert!(result.is_err());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn insert_message() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(InsertMsg {
index: 0,
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(all[0].content, "b");
assert_eq!(all[1].content, "a");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn remove_message() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
actor.ask(RemoveMsg { index: 0 }).await.unwrap();
let all: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].content, "b");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn pop_message() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let popped: Option<MockMessage> = actor.ask(PopMsg).await.unwrap();
assert_eq!(popped.unwrap().content, "b");
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn retain_by_role() {
let actor = spawn_actor(vec![]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::Assistant,
content: "a".into(),
},
})
.await
.unwrap();
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
actor.ask(RetainMsg { role: Role::User }).await.unwrap();
let all: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(all.len(), 2);
assert!(all.iter().all(|m| m.role() == Role::User));
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_incremental() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor.ask(ClearMsg).await.unwrap();
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_empty_is_noop() {
let actor = spawn_actor(vec![]);
actor.ask(ClearMsg).await.unwrap();
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_summarize() {
let summary_reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![summary_reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
for i in 0..5 {
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor
.ask(CompressMsg {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 2);
let comp: Vec<MockMessage> = actor.ask(CompressedMsg).await.unwrap();
assert_eq!(comp.len(), 1);
assert_eq!(comp[0].role, Role::System);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_noop_when_below_keep() {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "only".into(),
},
})
.await
.unwrap();
actor
.ask(CompressMsg {
strategy: CompressStrategy::Summarize {
keep: 5,
prompt: None,
},
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_appends_to_incremental() {
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let raw = actor
.ask(SendMsg {
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(raw[0].content, "reply");
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 1);
assert_eq!(inc[0].content, "reply");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_collects_and_stores() {
let chunks = vec![
MockMessage {
role: Role::Assistant,
content: "chunk1".into(),
},
MockMessage {
role: Role::Assistant,
content: "chunk2".into(),
},
];
let backend = MockBackend {
send_response: chunks,
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor
.ask(SendStreamMsg {
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await
.unwrap();
while stream.next().await.is_some() {}
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 2);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_delegates() {
let backend = MockBackend {
send_response: vec![],
token_count: 42,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let tokens: usize = actor.ask(EstimateTokensMsg).await.unwrap();
assert_eq!(tokens, 42);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn is_full_checks_context_window() {
let backend = MockBackend {
send_response: vec![],
token_count: 1001,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let full: bool = actor.ask(IsFullMsg).await.unwrap();
assert!(full);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
let _ = actor
.ask(SendMsg {
opts: MockOpts {
token: 42,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
42,
"SendMsg 应将 opts 透传给 backend.send()"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor
.ask(SendStreamMsg {
opts: MockOpts {
token: 7,
..Default::default()
},
})
.await
.unwrap();
while stream.next().await.is_some() {}
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
7,
"SendStreamMsg 应将 opts 透传给 backend.send_stream()"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_opts_passthrough() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
for i in 0..5 {
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor
.ask(CompressMsg {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: MockOpts {
token: 99,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
99,
"CompressMsg 应将 opts 透传给 backend.send()"
);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn is_full_when_tokens_exceed_window() {
let backend = MockBackend {
send_response: vec![],
token_count: 1001,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor = AgentContext::spawn(ctx);
let full: bool = actor.ask(IsFullMsg).await.unwrap();
assert!(full, "token_count > context_window 应返回已满");
actor.stop_gracefully().await.unwrap();
}
#[derive(Clone)]
struct ExtractErrorBackend;
impl ContextBackend for ExtractErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn extract_messages_from_backend_response(
&self,
_: &[Self::Response],
) -> Result<Vec<Self::Message>, AgentError> {
Err(AgentError::Context("模拟提取失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let _ = id;
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn context_window(&self) -> usize {
1000
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct ToRequestErrorBackend;
impl ContextBackend for ToRequestErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn to_request_messages(&self, _: Vec<MockMessage>) -> Result<Vec<MockMessage>, AgentError> {
Err(AgentError::Context("模拟转换失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let _ = id;
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages_from_backend_response(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn context_window(&self) -> usize {
1000
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct EstimateErrorBackend;
impl ContextBackend for EstimateErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Err(AgentError::Context("模拟估算失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let _ = id;
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages_from_backend_response(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn context_window(&self) -> usize {
1000
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[tokio::test]
async fn send_extract_error_propagates() {
let ctx = AgentContext::new(ExtractErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let result = actor
.ask(SendMsg {
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await;
assert!(result.is_err(), "extract 失败应传播错误");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_to_request_error_propagates() {
let ctx = AgentContext::new(ToRequestErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let result = actor
.ask(SendMsg {
opts: MockOpts {
token: 0,
..Default::default()
},
})
.await;
assert!(result.is_err(), "to_request 失败应传播错误");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_error_returns_zero() {
let ctx = AgentContext::new(EstimateErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let tokens: usize = actor.ask(EstimateTokensMsg).await.unwrap();
assert_eq!(tokens, 0, "估算失败应降级返回 0");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_jsonl_exports_all_messages() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let jsonl: String = actor.ask(ToJsonlMsg).await.unwrap();
let lines: Vec<&str> = jsonl.lines().collect();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("sys"));
assert!(lines[1].contains("hello"));
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_loads_messages() {
let actor = spawn_actor(vec![]);
let jsonl = r#"{"role":"User","content":"hello"}
{"role":"Assistant","content":"hi there"}"#;
actor
.ask(FromJsonlMsg {
jsonl: jsonl.into(),
})
.await
.unwrap();
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(all[0].content, "hello");
assert_eq!(all[1].content, "hi there");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_skips_empty_lines() {
let actor = spawn_actor(vec![]);
let jsonl = "\n\n{\"role\":\"User\",\"content\":\"only\"}\n\n";
actor
.ask(FromJsonlMsg {
jsonl: jsonl.into(),
})
.await
.unwrap();
let len: usize = actor.ask(Len).await.unwrap();
assert_eq!(len, 1);
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_jsonl_invalid_returns_error() {
let actor = spawn_actor(vec![]);
let result = actor
.ask(FromJsonlMsg {
jsonl: "not json".into(),
})
.await;
assert!(result.is_err());
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_jsonl_then_from_jsonl_roundtrip() {
let actor = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let jsonl: String = actor.ask(ToJsonlMsg).await.unwrap();
assert!(!jsonl.is_empty());
let actor2 = spawn_actor(vec![]);
actor2.ask(FromJsonlMsg { jsonl }).await.unwrap();
let len: usize = actor2.ask(Len).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = actor2.ask(MessagesMsg).await.unwrap();
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
actor2.stop_gracefully().await.unwrap();
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn is_full_error_returns_true() {
let ctx = AgentContext::new(EstimateErrorBackend, vec![]);
let actor = AgentContext::spawn(ctx);
let full: bool = actor.ask(IsFullMsg).await.unwrap();
assert!(full, "估算失败应降级为已满(安全策略)");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_with_scratch_appends_to_sent_messages() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let _ = actor
.ask(SendMsg {
opts: MockOpts {
token: 0,
scratch: Some("当前时间: 2026-05-28 20:00".into()),
..Default::default()
},
})
.await
.unwrap();
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
2,
"scratch 应作为额外 system 消息追加"
);
let inc: Vec<MockMessage> = actor.ask(IncrementalMsg).await.unwrap();
assert_eq!(inc.len(), 2);
assert_eq!(inc[0].role, Role::User);
assert_eq!(inc[1].role, Role::Assistant);
assert_eq!(inc[1].content, "reply");
actor.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_without_scratch_is_unchanged() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor = AgentContext::spawn(ctx);
actor
.ask(AppendMsg {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let _ = actor
.ask(SendMsg {
opts: MockOpts {
token: 0,
scratch: None,
..Default::default()
},
})
.await
.unwrap();
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
1,
"scratch 为 None 时不追加额外消息"
);
actor.stop_gracefully().await.unwrap();
}
}