use chrono::Utc;
use phi_core::agent_loop::{agent_loop, AgentLoopConfig};
use phi_core::context::{
BlockCompactionStrategy, CompactedSection, CompactionBlock, CompactionConfig,
DefaultBlockCompaction, TurnMap, TurnRange,
};
use phi_core::provider::{MockProvider, ModelConfig, StreamProvider};
use phi_core::session::{LoopRecord, LoopStatus};
use phi_core::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
fn make_loop_record(loop_id: &str, num_turns: u32) -> LoopRecord {
let mut messages = Vec::new();
for t in 0..num_turns {
let tid = Some(TurnId {
loop_id: loop_id.to_string(),
turn_index: t,
});
messages.push(
AgentMessage::from(Message::user(format!("Turn {} question", t)))
.with_turn_id(tid.clone()),
);
messages.push(
AgentMessage::from(Message::Assistant {
content: vec![Content::Text {
text: format!("Turn {} answer with some content that is meaningful", t),
}],
stop_reason: StopReason::Stop,
model: "test".into(),
provider: "test".into(),
usage: Usage::default(),
timestamp: 0,
error_message: None,
})
.with_turn_id(tid.clone()),
);
}
LoopRecord {
loop_id: loop_id.to_string(),
session_id: "test-session".to_string(),
agent_id: "test-agent".to_string(),
parent_loop_id: None,
continuation_kind: ContinuationKind::Initial,
started_at: Utc::now(),
ended_at: Some(Utc::now()),
status: LoopStatus::Completed,
rejection: None,
config: None,
messages,
turns: Vec::new(),
usage: Usage::default(),
metadata: None,
events: Vec::new(),
children_loop_ids: Vec::new(),
child_loop_refs: Vec::new(),
parallel_group: None,
compaction_block: None,
}
}
fn make_config(provider: Arc<dyn StreamProvider>) -> AgentLoopConfig {
AgentLoopConfig {
model_config: ModelConfig::anthropic("mock", "mock", "test"),
provider_override: Some(provider),
thinking_level: ThinkingLevel::Off,
max_tokens: None,
temperature: None,
convert_to_llm: None,
transform_context: None,
get_steering_messages: None,
get_follow_up_messages: None,
context_config: None,
execution_limits: None,
cache_config: CacheConfig::default(),
tool_execution: ToolExecutionStrategy::default(),
tool_timeout: None,
response_format: phi_core::provider::ResponseFormat::Text,
retry_config: phi_core::RetryConfig::default(),
before_turn: None,
after_turn: None,
on_error: None,
before_loop: None,
after_loop: None,
before_tool_execution: None,
after_tool_execution: None,
before_tool_execution_update: None,
after_tool_execution_update: None,
before_compaction_start: None,
after_compaction_end: None,
input_filters: vec![],
first_turn_trigger: TurnTrigger::User,
config_id: None,
context_translation: None,
prun_pending: None,
revert_pending: None,
}
}
fn fresh_context() -> AgentContext {
AgentContext {
system_prompt: "system".into(),
messages: Vec::new(),
tools: Vec::new(),
agent_id: None,
session_id: None,
loop_id: None,
parent_loop_id: None,
continuation_kind: None,
session: None,
user_context: Vec::new(),
inrun_context: Vec::new(),
active_node_id: None,
next_node_id: 0,
}
}
struct AwaitingStrategy {
keep_compacted_calls: Arc<AtomicU32>,
}
#[async_trait::async_trait]
impl BlockCompactionStrategy for AwaitingStrategy {
async fn keep_first(
&self,
_record: &LoopRecord,
_turn_map: &TurnMap,
_config: &CompactionConfig,
) -> Option<TurnRange> {
None
}
async fn keep_recent(
&self,
_record: &LoopRecord,
_turn_map: &TurnMap,
_config: &CompactionConfig,
) -> Option<CompactedSection> {
None
}
async fn keep_compacted(
&self,
_record: &LoopRecord,
_turn_map: &TurnMap,
_config: &CompactionConfig,
_is_most_recent: bool,
) -> Option<CompactedSection> {
tokio::time::sleep(Duration::from_millis(5)).await;
self.keep_compacted_calls.fetch_add(1, Ordering::SeqCst);
Some(CompactedSection {
range: TurnRange {
start_turn: 0,
end_turn: 0,
},
messages: vec![AgentMessage::Llm(LlmMessage::new(Message::user(
"[Summary] async-built",
)))],
})
}
}
#[tokio::test]
async fn async_block_strategy_dispatches() {
let strategy = AwaitingStrategy {
keep_compacted_calls: Arc::new(AtomicU32::new(0)),
};
let record = make_loop_record("test.model.1", 5);
let config = CompactionConfig::default();
let block: CompactionBlock = strategy.compact(&record, &config, true).await;
assert_eq!(
strategy.keep_compacted_calls.load(Ordering::SeqCst),
1,
"keep_compacted should be invoked exactly once when is_most_recent = true"
);
let compacted = block
.keep_compacted
.expect("AwaitingStrategy emits a keep_compacted section");
assert_eq!(compacted.messages.len(), 1);
assert_eq!(compacted.messages[0].role(), "user");
}
#[tokio::test]
async fn default_block_compaction_byte_compatible() {
let record = make_loop_record("test.model.1", 20);
let config = CompactionConfig {
keep_first_turns: 2,
keep_recent_turns: 5,
max_summary_tokens: 2_000,
..CompactionConfig::default()
};
let block = DefaultBlockCompaction.compact(&record, &config, true).await;
let kf = block.keep_first.expect("keep_first must be present");
assert_eq!(kf.start_turn, 0);
assert_eq!(kf.end_turn, 1);
let kc = block
.keep_compacted
.expect("keep_compacted must be present");
assert_eq!(kc.range.start_turn, 2);
assert_eq!(kc.range.end_turn, 14);
let kr = block.keep_recent.expect("keep_recent must be present");
assert_eq!(kr.range.start_turn, 15);
assert_eq!(kr.range.end_turn, 19);
assert_eq!(
kc.messages.len(),
13,
"expected one summary per middle turn"
);
for m in &kc.messages {
assert_eq!(m.role(), "user");
if let AgentMessage::Llm(lm) = m {
if let Message::User { content, .. } = &lm.message {
let text = content
.iter()
.find_map(|c| match c {
Content::Text { text } => Some(text.clone()),
_ => None,
})
.expect("summary message must carry Content::Text");
assert!(
text.starts_with("[Summary] "),
"summary text must start with '[Summary] '; got: {}",
text
);
} else {
panic!("expected Message::User for summary entry");
}
} else {
panic!("expected AgentMessage::Llm for summary entry");
}
}
}
#[tokio::test]
async fn async_before_compaction_hook_can_call_llm() {
let inner_call_completed = Arc::new(AtomicU32::new(0));
let inner_call_completed_clone = inner_call_completed.clone();
let hook: phi_core::agent_loop::BeforeCompactionStartFn = Arc::new(move |_tokens, _msgs| {
let counter = inner_call_completed_clone.clone();
Box::pin(async move {
let inner_provider = MockProvider::text("inner-llm-result");
let inner_config = make_config(Arc::new(inner_provider));
let mut inner_ctx = fresh_context();
let (tx, _rx) = mpsc::unbounded_channel();
let cancel = CancellationToken::new();
let prompt = AgentMessage::Llm(LlmMessage::new(Message::user("inner")));
let _ = agent_loop(vec![prompt], &mut inner_ctx, &inner_config, tx, cancel).await;
counter.fetch_add(1, Ordering::SeqCst);
true
})
});
let allowed = hook(0, 0).await;
assert!(
allowed,
"hook should vote true after inner async work completes"
);
assert_eq!(
inner_call_completed.load(Ordering::SeqCst),
1,
"inner LLM call must complete inside the hook before the vote"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_compaction_hooks_concurrent_runtime() {
let hook: phi_core::agent_loop::BeforeCompactionStartFn = Arc::new(|_tokens, _msgs| {
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
true
})
});
let start = Instant::now();
let hook_a = hook.clone();
let hook_b = hook.clone();
let (a, b) = tokio::join!(hook_a(0, 0), hook_b(0, 0));
let elapsed = start.elapsed();
assert!(a, "hook A must vote true");
assert!(b, "hook B must vote true");
assert!(
elapsed < Duration::from_millis(90),
"expected concurrent execution (< 90ms total), got {:?}",
elapsed
);
}