mod actor;
mod events;
mod stream;
mod types;
pub use actor::AgentContext;
pub use events::{
CompressStrategy, NotifyChange, NotifyCompressedForReply, RequestAppend, RequestClear,
RequestCompress, RequestCompressed, RequestEstimateTokens, RequestExtend, RequestFindByRole,
RequestImportIncremental, RequestGet, RequestImmutable, RequestIncremental, RequestInsert,
RequestIsEmpty, RequestLen, RequestMessages, RequestPop, RequestRemove, RequestRetain,
RequestSend, RequestSendStream, RequestSubscribeChange, RequestSubscribeCompressed,
RequestExportIncremental, RequestExportAll, RequestUnsubscribeChange, RequestUnsubscribeCompressed, RequestUpdate,
};
pub use stream::AgentSendStream;
pub use types::{
CommonOpts, ContextBackend, ContextBackendResponse, ResponseType, StreamEvent, ToolCallInfo,
};
#[cfg(test)]
mod tests {
use super::actor::AgentContext;
use super::events::{
CompressStrategy, NotifyChange, RequestAppend, RequestClear, RequestCompress,
RequestCompressed, RequestEstimateTokens, RequestExtend, RequestFindByRole,
RequestImportIncremental, RequestGet, RequestIncremental, RequestInsert, RequestIsEmpty,
RequestLen, RequestMessages, RequestPop, RequestRemove, RequestRetain, RequestSend,
RequestSendStream, RequestSubscribeChange,
RequestExportAll, RequestUpdate,
};
use super::types::{
CommonOpts, ContextBackend, ContextBackendResponse, ResponseType, ToolCallInfo,
};
use crate::error::AgentError;
use crate::message::ContextMessage;
use crate::role::Role;
use kameo::prelude::*;
use std::sync::Arc;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MockMessage {
pub role: Role,
pub content: String,
}
impl ContextMessage for MockMessage {
fn role(&self) -> Role {
self.role
}
fn preserve_reasoning(&self) -> bool {
false
}
fn without_reasoning(self) -> Self {
self
}
fn with_role(mut self, role: Role) -> Self {
self.role = role;
self
}
}
impl ContextBackendResponse for Vec<MockMessage> {
fn response_type(&self) -> ResponseType {
let content: String = self.iter().map(|m| m.content.as_str()).collect();
if content.is_empty() {
ResponseType::Empty
} else {
ResponseType::Content
}
}
fn reasoning_content(&self) -> Option<String> {
None
}
fn content(&self) -> Option<String> {
let text: String = self
.iter()
.filter(|m| !m.content.is_empty())
.map(|m| m.content.as_str())
.collect();
if text.is_empty() { None } else { Some(text) }
}
fn tool_calls(&self) -> Vec<ToolCallInfo> {
Vec::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MockOpts {
pub common: CommonOpts,
pub token: u64,
}
impl std::convert::AsRef<CommonOpts> for MockOpts {
fn as_ref(&self) -> &CommonOpts {
&self.common
}
}
fn mock_opts(token: u64) -> MockOpts {
MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: None,
},
token,
}
}
#[derive(Clone)]
pub struct MockBackend {
pub send_response: Vec<MockMessage>,
pub token_count: usize,
pub last_opts_token: Arc<std::sync::atomic::AtomicU64>,
pub last_sent_count: Arc<std::sync::atomic::AtomicUsize>,
}
impl ContextBackend for MockBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn user_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::User,
content: content.into(),
}
}
fn system_message(&self, content: impl Into<String> + Send) -> Self::Message {
MockMessage {
role: Role::System,
content: content.into(),
}
}
fn tool_message(
&self,
tool_call_id: impl Into<String> + Send,
content: impl Into<String> + Send,
) -> Self::Message {
let id = tool_call_id.into();
assert!(!id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: content.into(),
}
}
fn extract_messages(
&self,
responses: &[Self::Response],
) -> Result<Vec<Self::Message>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn merge_chunks(&self, _responses: &[Self::Response]) -> Option<Self::Message> {
None
}
async fn estimate_tokens(&self, _messages: &[Self::Message]) -> Result<usize, AgentError> {
Ok(self.token_count)
}
async fn send(
&self,
messages: &[Self::Message],
opts: &Self::Opts,
) -> Result<Self::Response, AgentError> {
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
self.last_sent_count
.store(messages.len(), std::sync::atomic::Ordering::SeqCst);
Ok(self.send_response.clone())
}
fn send_stream(
&self,
_messages: Vec<Self::Message>,
opts: Self::Opts,
) -> impl futures_core::Stream<Item = Result<Self::Response, AgentError>> + Send + 'static
{
self.last_opts_token
.store(opts.token, std::sync::atomic::Ordering::SeqCst);
let responses = self.send_response.clone();
futures::stream::iter(responses.into_iter().map(|m| Ok(vec![m])))
}
}
fn spawn_actor(immutable: Vec<MockMessage>) -> ActorRef<AgentContext<MockBackend>> {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, immutable);
AgentContext::spawn(ctx)
}
#[tokio::test]
async fn new_creates_empty_zones() {
let actor_ref = spawn_actor(vec![]);
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 0);
let empty: bool = actor_ref.ask(RequestIsEmpty).await.unwrap();
assert!(empty);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn new_with_immutable() {
let msgs = vec![MockMessage {
role: Role::System,
content: "system".into(),
}];
let actor_ref = spawn_actor(msgs);
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
let empty: bool = actor_ref.ask(RequestIsEmpty).await.unwrap();
assert!(!empty);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_on_change() {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Actor)]
struct ChangeCounter {
count: Arc<AtomicUsize>,
}
impl Message<NotifyChange<MockMessage>> for ChangeCounter {
type Reply = ();
async fn handle(
&mut self,
_change: NotifyChange<MockMessage>,
_ctx: &mut Context<Self, ()>,
) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
let count = Arc::new(AtomicUsize::new(0));
let counter = ChangeCounter::spawn(ChangeCounter {
count: count.clone(),
});
let recipient = counter.recipient::<NotifyChange<MockMessage>>();
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
actor_ref
.ask(RequestSubscribeChange { recipient })
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(count.load(Ordering::SeqCst), 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn append_and_get() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hi".into(),
},
})
.await
.unwrap();
let got: Option<MockMessage> = actor_ref.ask(RequestGet(0)).await.unwrap();
assert!(got.is_some());
assert_eq!(got.unwrap().content, "hi");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn extend_multiple() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestExtend {
messages: vec![
MockMessage {
role: Role::User,
content: "a".into(),
},
MockMessage {
role: Role::Assistant,
content: "b".into(),
},
],
})
.await
.unwrap();
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn get_crosses_zones() {
let immutable_msg = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor_ref = spawn_actor(vec![immutable_msg]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "user".into(),
},
})
.await
.unwrap();
let sys: Option<MockMessage> = actor_ref.ask(RequestGet(0)).await.unwrap();
assert_eq!(sys.unwrap().content, "sys");
let user: Option<MockMessage> = actor_ref.ask(RequestGet(1)).await.unwrap();
assert_eq!(user.unwrap().content, "user");
let none: Option<MockMessage> = actor_ref.ask(RequestGet(99)).await.unwrap();
assert!(none.is_none());
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn messages_returns_all_zones() {
let sys = MockMessage {
role: Role::System,
content: "sys".into(),
};
let actor_ref = spawn_actor(vec![sys]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor_ref.ask(RequestMessages).await.unwrap();
assert_eq!(all.len(), 2);
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn incremental_readonly() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn find_by_role() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u1".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::Assistant,
content: "a1".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
let users: Vec<MockMessage> = actor_ref.ask(RequestFindByRole(Role::User)).await.unwrap();
assert_eq!(users.len(), 2);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_message() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "old".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestUpdate {
index: 0,
message: MockMessage {
role: Role::User,
content: "new".into(),
},
})
.await
.unwrap();
let msg: Option<MockMessage> = actor_ref.ask(RequestGet(0)).await.unwrap();
assert_eq!(msg.unwrap().content, "new");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn update_out_of_bounds() {
let actor_ref = spawn_actor(vec![]);
let result = actor_ref
.ask(RequestUpdate {
index: 0,
message: MockMessage {
role: Role::User,
content: "x".into(),
},
})
.await;
assert!(result.is_err());
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn insert_message() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestInsert {
index: 0,
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let all: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(all[0].content, "b");
assert_eq!(all[1].content, "a");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn remove_message() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
actor_ref.ask(RequestRemove { index: 0 }).await.unwrap();
let all: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].content, "b");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn pop_message() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "a".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "b".into(),
},
})
.await
.unwrap();
let popped: Option<MockMessage> = actor_ref.ask(RequestPop).await.unwrap();
assert_eq!(popped.unwrap().content, "b");
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn retain_by_role() {
let actor_ref = spawn_actor(vec![]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::Assistant,
content: "a".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u2".into(),
},
})
.await
.unwrap();
actor_ref.ask(RequestRetain { role: Role::User }).await.unwrap();
let all: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(all.len(), 2);
assert!(all.iter().all(|m| m.role() == Role::User));
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_incremental() {
let actor_ref = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
actor_ref.ask(RequestClear).await.unwrap();
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn clear_empty_is_noop() {
let actor_ref = spawn_actor(vec![]);
actor_ref.ask(RequestClear).await.unwrap();
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_summarize() {
let summary_reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![summary_reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
for i in 0..5 {
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor_ref
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: mock_opts(0),
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
let comp: Vec<MockMessage> = actor_ref.ask(RequestCompressed).await.unwrap();
assert_eq!(comp.len(), 1);
assert_eq!(comp[0].role, Role::System);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_noop_when_below_keep() {
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "only".into(),
},
})
.await
.unwrap();
actor_ref
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 5,
prompt: None,
},
opts: mock_opts(0),
})
.await
.unwrap();
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_appends_to_incremental() {
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
let raw = actor_ref.ask(RequestSend { opts: mock_opts(0) }).await.unwrap();
assert_eq!(raw[0].content, "reply");
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 1);
assert_eq!(inc[0].content, "reply");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_collects_and_stores() {
let chunks = vec![
MockMessage {
role: Role::Assistant,
content: "chunk1".into(),
},
MockMessage {
role: Role::Assistant,
content: "chunk2".into(),
},
];
let backend = MockBackend {
send_response: chunks,
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor_ref
.ask(RequestSendStream { opts: mock_opts(0) })
.await
.unwrap();
let mut count = 0usize;
while stream.next().await.is_some() {
count += 1;
}
assert!(count > 0, "Stream should have at least one item");
for chunk in stream.take_chunks() {
for msg in chunk {
actor_ref.ask(RequestAppend { message: msg }).await.unwrap();
}
}
}
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_delegates() {
let backend = MockBackend {
send_response: vec![],
token_count: 42,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
let tokens: usize = actor_ref.ask(RequestEstimateTokens).await.unwrap();
assert_eq!(tokens, 42);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor_ref = AgentContext::spawn(ctx);
let response = actor_ref
.ask(RequestSend {
opts: mock_opts(42),
})
.await
.unwrap();
assert_eq!(
response.len(),
0,
"Empty response expected for opts passthrough test"
);
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
42,
"RequestSend 应将 opts 透传给 backend.send()"
);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_stream_opts_passthrough() {
use std::sync::atomic::Ordering;
let backend = MockBackend {
send_response: vec![],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor_ref = AgentContext::spawn(ctx);
{
use futures::StreamExt;
let mut stream = actor_ref
.ask(RequestSendStream { opts: mock_opts(7) })
.await
.unwrap();
let mut count = 0usize;
while stream.next().await.is_some() {
count += 1;
}
assert_eq!(count, 0, "Empty stream expected for opts passthrough test");
}
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
7,
"RequestSendStream 应将 opts 透传给 backend.send_stream()"
);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_opts_passthrough() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "摘要".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor_ref = AgentContext::spawn(ctx);
for i in 0..5 {
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: format!("msg{i}"),
},
})
.await
.unwrap();
}
actor_ref
.ask(RequestCompress {
strategy: CompressStrategy::Summarize {
keep: 2,
prompt: None,
},
opts: mock_opts(99),
})
.await
.unwrap();
assert_eq!(
backend.last_opts_token.load(Ordering::SeqCst),
99,
"RequestCompress 应将 opts 透传给 backend.send()"
);
actor_ref.stop_gracefully().await.unwrap();
}
#[derive(Clone)]
struct ExtractErrorBackend;
impl ContextBackend for ExtractErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn extract_messages(&self, _: &[Self::Response]) -> Result<Vec<Self::Message>, AgentError> {
Err(AgentError::Context("模拟提取失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct ToRequestErrorBackend;
impl ContextBackend for ToRequestErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn to_request_messages(&self, _: Vec<MockMessage>) -> Result<Vec<MockMessage>, AgentError> {
Err(AgentError::Context("模拟转换失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Ok(0)
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[derive(Clone)]
struct EstimateErrorBackend;
impl ContextBackend for EstimateErrorBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
async fn estimate_tokens(&self, _: &[MockMessage]) -> Result<usize, AgentError> {
Err(AgentError::Context("模拟估算失败".into()))
}
fn merge_chunks(&self, _: &[Self::Response]) -> Option<Self::Message> {
None
}
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::User,
content: c.into(),
}
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage {
role: Role::System,
content: c.into(),
}
}
fn tool_message(
&self,
id: impl Into<String> + Send,
c: impl Into<String> + Send,
) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty(), "tool_call_id should not be empty");
MockMessage {
role: Role::Tool,
content: c.into(),
}
}
fn extract_messages(
&self,
responses: &[Vec<MockMessage>],
) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
async fn send(
&self,
_: &[MockMessage],
_: &MockOpts,
) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(
&self,
_: Vec<MockMessage>,
_: MockOpts,
) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static
{
futures::stream::empty()
}
}
#[tokio::test]
async fn send_extract_error_propagates() {
let ctx = AgentContext::new(ExtractErrorBackend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
let result = actor_ref.ask(RequestSend { opts: mock_opts(0) }).await;
assert!(result.is_err(), "extract 失败应传播错误");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_to_request_error_propagates() {
let ctx = AgentContext::new(ToRequestErrorBackend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
let result = actor_ref.ask(RequestSend { opts: mock_opts(0) }).await;
assert!(result.is_err(), "to_request 失败应传播错误");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn estimate_tokens_error_returns_zero() {
let ctx = AgentContext::new(EstimateErrorBackend, vec![]);
let actor_ref = AgentContext::spawn(ctx);
let tokens: usize = actor_ref.ask(RequestEstimateTokens).await.unwrap();
assert_eq!(tokens, 0, "估算失败应降级返回 0");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_json_exports_all_messages() {
let actor_ref = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let json: String = actor_ref.ask(RequestExportAll).await.unwrap();
let lines: Vec<&str> = json.lines().collect();
assert_eq!(lines.len(), 2);
assert!(lines[0].contains("sys"));
assert!(lines[1].contains("hello"));
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_json_loads_messages() {
let actor_ref = spawn_actor(vec![]);
let json = r#"{"role":"User","content":"hello"}
{"role":"Assistant","content":"hi there"}"#;
actor_ref
.ask(RequestImportIncremental {
json: json.into(),
})
.await
.unwrap();
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(all[0].content, "hello");
assert_eq!(all[1].content, "hi there");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_json_skips_empty_lines() {
let actor_ref = spawn_actor(vec![]);
let json = "\n\n{\"role\":\"User\",\"content\":\"only\"}\n\n";
actor_ref
.ask(RequestImportIncremental {
json: json.into(),
})
.await
.unwrap();
let len: usize = actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 1);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn from_json_invalid_returns_error() {
let actor_ref = spawn_actor(vec![]);
let result = actor_ref
.ask(RequestImportIncremental {
json: "not json".into(),
})
.await;
assert!(result.is_err());
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn to_json_then_from_json_roundtrip() {
let actor_ref = spawn_actor(vec![MockMessage {
role: Role::System,
content: "sys".into(),
}]);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "u".into(),
},
})
.await
.unwrap();
let json: String = actor_ref.ask(RequestExportAll).await.unwrap();
assert!(!json.is_empty());
let import_actor_ref = spawn_actor(vec![]);
import_actor_ref.ask(RequestImportIncremental { json }).await.unwrap();
let len: usize = import_actor_ref.ask(RequestLen).await.unwrap();
assert_eq!(len, 2);
let all: Vec<MockMessage> = import_actor_ref.ask(RequestMessages).await.unwrap();
assert_eq!(all[0].content, "sys");
assert_eq!(all[1].content, "u");
import_actor_ref.stop_gracefully().await.unwrap();
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_with_scratch_appends_to_sent_messages() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor_ref = AgentContext::spawn(ctx);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let response = actor_ref
.ask(RequestSend {
opts: MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: Some("当前时间: 2026-05-28 20:00".into()),
},
token: 0,
},
})
.await
.unwrap();
assert_eq!(response.len(), 1, "Should have one response message");
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
2,
"scratch 应作为额外 system 消息追加"
);
let inc: Vec<MockMessage> = actor_ref.ask(RequestIncremental).await.unwrap();
assert_eq!(inc.len(), 2);
assert_eq!(inc[0].role, Role::User);
assert_eq!(inc[1].role, Role::Assistant);
assert_eq!(inc[1].content, "reply");
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn send_msg_without_scratch_is_unchanged() {
use std::sync::atomic::Ordering;
let reply = MockMessage {
role: Role::Assistant,
content: "reply".into(),
};
let backend = MockBackend {
send_response: vec![reply],
token_count: 0,
last_opts_token: Arc::new(std::sync::atomic::AtomicU64::new(0)),
last_sent_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
};
let ctx = AgentContext::new(backend.clone(), vec![]);
let actor_ref = AgentContext::spawn(ctx);
actor_ref
.ask(RequestAppend {
message: MockMessage {
role: Role::User,
content: "hello".into(),
},
})
.await
.unwrap();
let response = actor_ref.ask(RequestSend { opts: mock_opts(0) }).await.unwrap();
assert_eq!(response.len(), 1, "Should have one response message");
assert_eq!(
backend.last_sent_count.load(Ordering::SeqCst),
1,
"scratch 为 None 时不追加额外消息"
);
actor_ref.stop_gracefully().await.unwrap();
}
#[tokio::test]
async fn compress_if_full_includes_scratch_in_token_estimation() {
use std::sync::atomic::Ordering;
let estimated = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let estimated_clone = estimated.clone();
#[derive(Clone)]
struct CountingBackend {
estimated: Arc<std::sync::atomic::AtomicUsize>,
}
impl ContextBackend for CountingBackend {
type Message = MockMessage;
type Opts = MockOpts;
type Response = Vec<MockMessage>;
fn user_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage { role: Role::User, content: c.into() }
}
fn system_message(&self, c: impl Into<String> + Send) -> MockMessage {
MockMessage { role: Role::System, content: c.into() }
}
fn tool_message(&self, id: impl Into<String> + Send, c: impl Into<String> + Send) -> MockMessage {
let tool_call_id = id.into();
assert!(!tool_call_id.is_empty());
MockMessage { role: Role::Tool, content: c.into() }
}
fn to_request_messages(&self, msgs: Vec<MockMessage>) -> Result<Vec<MockMessage>, AgentError> {
Ok(msgs)
}
fn to_system_message(&self, msg: MockMessage) -> MockMessage {
MockMessage { role: Role::System, content: msg.content }
}
fn extract_messages(&self, responses: &[Vec<MockMessage>]) -> Result<Vec<MockMessage>, AgentError> {
Ok(responses.iter().flat_map(|r| r.iter().cloned()).collect())
}
fn merge_chunks(&self, _: &[Vec<MockMessage>]) -> Option<MockMessage> { None }
fn message_to_json(&self, m: &MockMessage) -> Result<String, AgentError> {
serde_json::to_string(m).map_err(|e| AgentError::Context(e.to_string()))
}
fn message_from_json(&self, line: &str) -> Result<MockMessage, AgentError> {
serde_json::from_str(line).map_err(|e| AgentError::Context(e.to_string()))
}
async fn estimate_tokens(&self, messages: &[MockMessage]) -> Result<usize, AgentError> {
self.estimated.store(messages.len(), Ordering::SeqCst);
Ok(0)
}
async fn send(&self, _: &[MockMessage], _: &MockOpts) -> Result<Vec<MockMessage>, AgentError> {
Ok(vec![])
}
fn send_stream(&self, _: Vec<MockMessage>, _: MockOpts) -> impl futures_core::Stream<Item = Result<Vec<MockMessage>, AgentError>> + Send + 'static {
futures::stream::empty()
}
}
let backend = CountingBackend { estimated: estimated_clone };
let ctx = AgentContext::new(backend, vec![MockMessage { role: Role::System, content: "sys".into() }]);
let actor_ref = AgentContext::spawn(ctx);
actor_ref.ask(RequestAppend {
message: MockMessage { role: Role::User, content: "hello".into() },
}).await.unwrap();
let result = actor_ref.ask(RequestSend {
opts: MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: None,
},
token: 0,
},
}).await;
assert!(result.is_ok());
assert_eq!(estimated.load(Ordering::SeqCst), 2, "无 scratch 时应估算 2 条消息");
let result2 = actor_ref.ask(RequestSend {
opts: MockOpts {
common: CommonOpts {
model: "test".into(),
context_window: 1000,
max_tokens: 4096,
auto_compress: false,
scratch: Some("scratch内容".into()),
},
token: 0,
},
}).await;
assert!(result2.is_ok());
assert_eq!(estimated.load(Ordering::SeqCst), 3, "有 scratch 时应估算 3 条消息");
actor_ref.stop_gracefully().await.unwrap();
}
}