use adk_core::{
Agent, BaseEventsSummarizer, Content, Event, EventActions, EventCompaction, EventStream,
EventsCompactionConfig, InvocationContext, Part, Result, SessionId, UserId,
};
use adk_runner::{Runner, RunnerConfig};
use adk_session::{InMemorySessionService, SessionService};
use async_trait::async_trait;
use futures::StreamExt;
use std::sync::{Arc, Mutex};
struct HistoryCapturingAgent {
name: String,
captured_histories: Arc<Mutex<Vec<Vec<Content>>>>,
}
#[async_trait]
impl Agent for HistoryCapturingAgent {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
"Captures conversation history for testing"
}
fn sub_agents(&self) -> &[Arc<dyn Agent>] {
&[]
}
async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
let history = ctx.session().conversation_history();
self.captured_histories.lock().unwrap().push(history);
let name = self.name.clone();
Ok(Box::pin(futures::stream::once(async move {
let mut event = Event::new("inv-e2e");
event.author = name;
event.set_content(Content::new("model").with_text("Agent response"));
Ok(event)
})))
}
}
struct DeterministicSummarizer {
call_count: Mutex<u32>,
}
impl DeterministicSummarizer {
fn new() -> Self {
Self { call_count: Mutex::new(0) }
}
fn call_count(&self) -> u32 {
*self.call_count.lock().unwrap()
}
}
#[async_trait]
impl BaseEventsSummarizer for DeterministicSummarizer {
async fn summarize_events(&self, events: &[Event]) -> Result<Option<Event>> {
let mut count = self.call_count.lock().unwrap();
*count += 1;
let n = *count;
if events.is_empty() {
return Ok(None);
}
let num_events = events.len();
let summary_text = format!("[Compaction #{}: summarized {} events]", n, num_events);
let summary_content = Content::new("model").with_text(&summary_text);
let start_timestamp = events.first().unwrap().timestamp;
let end_timestamp = events.last().unwrap().timestamp;
let mut event = Event::new("compaction");
event.author = "system".to_string();
event.actions = EventActions {
compaction: Some(EventCompaction {
start_timestamp,
end_timestamp,
compacted_content: summary_content,
}),
..Default::default()
};
Ok(Some(event))
}
}
#[tokio::test]
async fn test_e2e_compaction_with_inmemory_session() {
let session_service = Arc::new(InMemorySessionService::new());
let captured_histories = Arc::new(Mutex::new(Vec::new()));
let summarizer = Arc::new(DeterministicSummarizer::new());
let agent = Arc::new(HistoryCapturingAgent {
name: "test_agent".to_string(),
captured_histories: captured_histories.clone(),
});
session_service
.create(adk_session::CreateRequest {
app_name: "test_app".to_string(),
user_id: "user-1".to_string(),
session_id: Some("sess-e2e".to_string()),
state: Default::default(),
})
.await
.unwrap();
let runner = Runner::new(RunnerConfig {
app_name: "test_app".to_string(),
agent: agent.clone(),
session_service: session_service.clone(),
artifact_service: None,
memory_service: None,
plugin_manager: None,
run_config: None,
compaction_config: Some(EventsCompactionConfig {
compaction_interval: 2, overlap_size: 0,
summarizer: summarizer.clone(),
}),
context_cache_config: None,
cache_capable: None,
request_context: None,
cancellation_token: None,
intra_compaction_config: None,
intra_compaction_summarizer: None,
})
.unwrap();
let content1 = Content::new("user").with_text("Hello");
let mut stream = runner
.run(UserId::new("user-1").unwrap(), SessionId::new("sess-e2e").unwrap(), content1)
.await
.unwrap();
while let Some(r) = stream.next().await {
assert!(r.is_ok(), "Invocation 1 failed: {:?}", r.err());
}
let content2 = Content::new("user").with_text("How are you?");
let mut stream = runner
.run(UserId::new("user-1").unwrap(), SessionId::new("sess-e2e").unwrap(), content2)
.await
.unwrap();
while let Some(r) = stream.next().await {
assert!(r.is_ok(), "Invocation 2 failed: {:?}", r.err());
}
assert_eq!(
summarizer.call_count(),
1,
"Summarizer should have been called once after 2 invocations"
);
let session = session_service
.get(adk_session::GetRequest {
app_name: "test_app".to_string(),
user_id: "user-1".to_string(),
session_id: "sess-e2e".to_string(),
num_recent_events: None,
after: None,
})
.await
.unwrap();
let events = session.events().all();
let compaction_events: Vec<_> =
events.iter().filter(|e| e.actions.compaction.is_some()).collect();
assert_eq!(
compaction_events.len(),
1,
"Expected 1 compaction event in session, found {}",
compaction_events.len()
);
let compaction = compaction_events[0].actions.compaction.as_ref().unwrap();
let summary_text = match &compaction.compacted_content.parts[0] {
Part::Text { text } => text.clone(),
_ => panic!("Expected text part in compaction summary"),
};
assert!(
summary_text.contains("Compaction #1"),
"Summary should contain compaction marker, got: {}",
summary_text
);
}
#[test]
fn test_event_compaction_serde_roundtrip() {
let compaction = EventCompaction {
start_timestamp: chrono::Utc::now() - chrono::Duration::minutes(5),
end_timestamp: chrono::Utc::now(),
compacted_content: Content::new("model").with_text("Summary of conversation"),
};
let actions = EventActions { compaction: Some(compaction.clone()), ..Default::default() };
let json = serde_json::to_string(&actions).unwrap();
let deserialized: EventActions = serde_json::from_str(&json).unwrap();
let restored = deserialized.compaction.unwrap();
assert_eq!(restored.start_timestamp, compaction.start_timestamp);
assert_eq!(restored.end_timestamp, compaction.end_timestamp);
assert_eq!(restored.compacted_content.role, "model");
let text = match &restored.compacted_content.parts[0] {
Part::Text { text } => text.clone(),
_ => panic!("Expected text part"),
};
assert_eq!(text, "Summary of conversation");
}