phi-core 0.10.0

Simple, effective agent loop with tool execution and event streaming
Documentation
//! Tests for the 0.9.0 async-trait migration of `BlockCompactionStrategy` and
//! the compaction lifecycle hooks (`BeforeCompactionStartFn` /
//! `AfterCompactionEndFn`).
//!
//! These tests load-bear that:
//! - Custom async trait impls dispatch correctly (awaits flow without
//!   `block_in_place` bridges).
//! - `DefaultBlockCompaction` output remains byte-compatible with the
//!   pre-0.9.0 sync implementation (no semantic regression).
//! - Lifecycle hooks accept async closure bodies and run inside concurrent
//!   tokio tasks without serialising wall-clock work.
//!
//! Pairs with `turn_request_capture_test.rs` to cover both 0.9.0 surfaces
//! (debug-capture + async-trait migration).

use chrono::Utc;
use phi_core::agent_loop::{agent_loop, AgentLoopConfig};
use phi_core::context::{
    BlockCompactionStrategy, CompactedSection, CompactionBlock, CompactionConfig,
    DefaultBlockCompaction, TurnMap, TurnRange,
};
use phi_core::provider::{MockProvider, ModelConfig, StreamProvider};
use phi_core::session::{LoopRecord, LoopStatus};
use phi_core::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

// ---------------------------------------------------------------------------
// Helpers (shape mirrors tests/compaction_test.rs::make_loop_record)
// ---------------------------------------------------------------------------

fn make_loop_record(loop_id: &str, num_turns: u32) -> LoopRecord {
    let mut messages = Vec::new();
    for t in 0..num_turns {
        let tid = Some(TurnId {
            loop_id: loop_id.to_string(),
            turn_index: t,
        });
        messages.push(
            AgentMessage::from(Message::user(format!("Turn {} question", t)))
                .with_turn_id(tid.clone()),
        );
        messages.push(
            AgentMessage::from(Message::Assistant {
                content: vec![Content::Text {
                    text: format!("Turn {} answer with some content that is meaningful", t),
                }],
                stop_reason: StopReason::Stop,
                model: "test".into(),
                provider: "test".into(),
                usage: Usage::default(),
                timestamp: 0,
                error_message: None,
            })
            .with_turn_id(tid.clone()),
        );
    }
    LoopRecord {
        loop_id: loop_id.to_string(),
        session_id: "test-session".to_string(),
        agent_id: "test-agent".to_string(),
        parent_loop_id: None,
        continuation_kind: ContinuationKind::Initial,
        started_at: Utc::now(),
        ended_at: Some(Utc::now()),
        status: LoopStatus::Completed,
        rejection: None,
        config: None,
        messages,
        turns: Vec::new(),
        usage: Usage::default(),
        metadata: None,
        events: Vec::new(),
        children_loop_ids: Vec::new(),
        child_loop_refs: Vec::new(),
        parallel_group: None,
        compaction_block: None,
    }
}

fn make_config(provider: Arc<dyn StreamProvider>) -> AgentLoopConfig {
    AgentLoopConfig {
        model_config: ModelConfig::anthropic("mock", "mock", "test"),
        provider_override: Some(provider),
        thinking_level: ThinkingLevel::Off,
        max_tokens: None,
        temperature: None,
        convert_to_llm: None,
        transform_context: None,
        get_steering_messages: None,
        get_follow_up_messages: None,
        context_config: None,
        execution_limits: None,
        cache_config: CacheConfig::default(),
        tool_execution: ToolExecutionStrategy::default(),
        tool_timeout: None,
        response_format: phi_core::provider::ResponseFormat::Text,
        retry_config: phi_core::RetryConfig::default(),
        before_turn: None,
        after_turn: None,
        on_error: None,
        before_loop: None,
        after_loop: None,
        before_tool_execution: None,
        after_tool_execution: None,
        before_tool_execution_update: None,
        after_tool_execution_update: None,
        before_compaction_start: None,
        after_compaction_end: None,
        input_filters: vec![],
        first_turn_trigger: TurnTrigger::User,
        config_id: None,
        context_translation: None,
        prun_pending: None,
        revert_pending: None,
        current_tool: None,
        revert_render_policy: phi_core::RevertRenderPolicy::default(),
    }
}

fn fresh_context() -> AgentContext {
    AgentContext {
        system_prompt: "system".into(),
        messages: Vec::new(),
        tools: Vec::new(),
        agent_id: None,
        session_id: None,
        loop_id: None,
        parent_loop_id: None,
        continuation_kind: None,
        session: None,
        user_context: Vec::new(),
        inrun_context: Vec::new(),
        active_node_id: None,
        next_node_id: 0,
    }
}

// ---------------------------------------------------------------------------
// 1. async_block_strategy_dispatches
// ---------------------------------------------------------------------------

/// Custom strategy whose async methods actually `.await` real async work
/// (a `tokio::time::sleep`). Verifies that the trait's `async fn` shape
/// dispatches and runs inside a tokio runtime without `block_in_place`
/// workarounds.
struct AwaitingStrategy {
    keep_compacted_calls: Arc<AtomicU32>,
}

#[async_trait::async_trait]
impl BlockCompactionStrategy for AwaitingStrategy {
    async fn keep_first(
        &self,
        _record: &LoopRecord,
        _turn_map: &TurnMap,
        _config: &CompactionConfig,
    ) -> Option<TurnRange> {
        None
    }

    async fn keep_recent(
        &self,
        _record: &LoopRecord,
        _turn_map: &TurnMap,
        _config: &CompactionConfig,
    ) -> Option<CompactedSection> {
        None
    }

    async fn keep_compacted(
        &self,
        _record: &LoopRecord,
        _turn_map: &TurnMap,
        _config: &CompactionConfig,
        _is_most_recent: bool,
    ) -> Option<CompactedSection> {
        // Real async work — sleep for 5ms.
        tokio::time::sleep(Duration::from_millis(5)).await;
        self.keep_compacted_calls.fetch_add(1, Ordering::SeqCst);
        Some(CompactedSection {
            range: TurnRange {
                start_turn: 0,
                end_turn: 0,
            },
            messages: vec![AgentMessage::Llm(LlmMessage::new(Message::user(
                "[Summary] async-built",
            )))],
        })
    }
}

#[tokio::test]
async fn async_block_strategy_dispatches() {
    let strategy = AwaitingStrategy {
        keep_compacted_calls: Arc::new(AtomicU32::new(0)),
    };
    let record = make_loop_record("test.model.1", 5);
    let config = CompactionConfig::default();

    let block: CompactionBlock = strategy.compact(&record, &config, true).await;

    assert_eq!(
        strategy.keep_compacted_calls.load(Ordering::SeqCst),
        1,
        "keep_compacted should be invoked exactly once when is_most_recent = true"
    );
    let compacted = block
        .keep_compacted
        .expect("AwaitingStrategy emits a keep_compacted section");
    assert_eq!(compacted.messages.len(), 1);
    assert_eq!(compacted.messages[0].role(), "user");
}

// ---------------------------------------------------------------------------
// 2. default_block_compaction_byte_compatible
// ---------------------------------------------------------------------------

/// Golden round-trip: `DefaultBlockCompaction.compact(..., is_most_recent=true)`
/// over a 20-turn fixture must produce a stable shape (keep_first / keep_recent /
/// keep_compacted ranges and per-turn one-liner summary count). The pre-0.9.0
/// sync impl produced this exact shape; the 0.9.0 async impl bodies are
/// unchanged so the contract is byte-equal.
#[tokio::test]
async fn default_block_compaction_byte_compatible() {
    let record = make_loop_record("test.model.1", 20);
    let config = CompactionConfig {
        keep_first_turns: 2,
        keep_recent_turns: 5,
        max_summary_tokens: 2_000,
        ..CompactionConfig::default()
    };

    let block = DefaultBlockCompaction.compact(&record, &config, true).await;

    // keep_first: turns 0..=1.
    let kf = block.keep_first.expect("keep_first must be present");
    assert_eq!(kf.start_turn, 0);
    assert_eq!(kf.end_turn, 1);

    // keep_compacted: middle 2..=14.
    let kc = block
        .keep_compacted
        .expect("keep_compacted must be present");
    assert_eq!(kc.range.start_turn, 2);
    assert_eq!(kc.range.end_turn, 14);

    // keep_recent: turns 15..=19.
    let kr = block.keep_recent.expect("keep_recent must be present");
    assert_eq!(kr.range.start_turn, 15);
    assert_eq!(kr.range.end_turn, 19);

    // keep_compacted produces one user-role "[Summary] ..." line per assistant
    // message in the middle range (turns 2..=14 → 13 assistant messages → 13
    // summary entries). The pre-0.9.0 impl produced this exact count.
    assert_eq!(
        kc.messages.len(),
        13,
        "expected one summary per middle turn"
    );
    for m in &kc.messages {
        assert_eq!(m.role(), "user");
        if let AgentMessage::Llm(lm) = m {
            if let Message::User { content, .. } = &lm.message {
                let text = content
                    .iter()
                    .find_map(|c| match c {
                        Content::Text { text } => Some(text.clone()),
                        _ => None,
                    })
                    .expect("summary message must carry Content::Text");
                assert!(
                    text.starts_with("[Summary] "),
                    "summary text must start with '[Summary] '; got: {}",
                    text
                );
            } else {
                panic!("expected Message::User for summary entry");
            }
        } else {
            panic!("expected AgentMessage::Llm for summary entry");
        }
    }
}

// ---------------------------------------------------------------------------
// 3. async_before_compaction_hook_can_call_llm
// ---------------------------------------------------------------------------

/// The 0.9.0 `BeforeCompactionStartFn` is async — verify that the hook body
/// can perform await work (here: a MockProvider stream call via a side-channel
/// agent_loop) and still vote `true` to allow compaction to proceed.
///
/// We exercise the hook via a direct invocation rather than wiring it through
/// the agent loop's compaction trigger (which requires very precise context-
/// budget shaping); the goal of this test is to prove the hook's type shape
/// admits async bodies and that the agent loop awaits them correctly.
#[tokio::test]
async fn async_before_compaction_hook_can_call_llm() {
    // Tracks whether the hook completed an inner async call before voting.
    let inner_call_completed = Arc::new(AtomicU32::new(0));
    let inner_call_completed_clone = inner_call_completed.clone();

    // Build an async BeforeCompactionStartFn that awaits an inner agent_loop
    // call (which streams a MockProvider response) before returning `true`.
    let hook: phi_core::agent_loop::BeforeCompactionStartFn = Arc::new(move |_tokens, _msgs| {
        let counter = inner_call_completed_clone.clone();
        Box::pin(async move {
            // Inner LLM call — proves the hook body can perform real async
            // provider streaming without a `block_in_place` bridge.
            let inner_provider = MockProvider::text("inner-llm-result");
            let inner_config = make_config(Arc::new(inner_provider));
            let mut inner_ctx = fresh_context();
            let (tx, _rx) = mpsc::unbounded_channel();
            let cancel = CancellationToken::new();
            let prompt = AgentMessage::Llm(LlmMessage::new(Message::user("inner")));
            let _ = agent_loop(vec![prompt], &mut inner_ctx, &inner_config, tx, cancel).await;
            counter.fetch_add(1, Ordering::SeqCst);
            true
        })
    });

    // Invoke the hook directly to confirm async-body semantics.
    let allowed = hook(0, 0).await;
    assert!(
        allowed,
        "hook should vote true after inner async work completes"
    );
    assert_eq!(
        inner_call_completed.load(Ordering::SeqCst),
        1,
        "inner LLM call must complete inside the hook before the vote"
    );
}

// ---------------------------------------------------------------------------
// 4. async_compaction_hooks_concurrent_runtime
// ---------------------------------------------------------------------------

/// Spawn two parallel hook invocations whose bodies sleep for 50ms each;
/// assert wall-clock concurrency (< 100ms total) — proves the tokio runtime
/// drives both async hook bodies concurrently rather than serialising them.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn async_compaction_hooks_concurrent_runtime() {
    let hook: phi_core::agent_loop::BeforeCompactionStartFn = Arc::new(|_tokens, _msgs| {
        Box::pin(async move {
            tokio::time::sleep(Duration::from_millis(50)).await;
            true
        })
    });

    let start = Instant::now();
    let hook_a = hook.clone();
    let hook_b = hook.clone();
    let (a, b) = tokio::join!(hook_a(0, 0), hook_b(0, 0));
    let elapsed = start.elapsed();

    assert!(a, "hook A must vote true");
    assert!(b, "hook B must vote true");
    // Both sleep 50ms in parallel; serialised would be ~100ms+; allow some slack
    // for scheduler jitter but keep well below 100ms.
    assert!(
        elapsed < Duration::from_millis(90),
        "expected concurrent execution (< 90ms total), got {:?}",
        elapsed
    );
}