mentra 0.5.0

An agent runtime for tool-using LLM applications
Documentation
use std::{
    fs,
    path::PathBuf,
    sync::atomic::{AtomicU64, Ordering},
    time::{SystemTime, UNIX_EPOCH},
};

use crate::{
    BuiltinProvider, ContentBlock, Message, Role,
    agent::{AgentConfig, AgentEvent, CompactionConfig, CompactionTrigger},
    provider::{ContentBlockDelta, ContentBlockStart, ProviderEvent, Request},
    runtime::Runtime,
};

use super::support::{ScriptedProvider, StaticTool, StreamScript, model_info, ok_stream};

#[tokio::test]
async fn micro_compaction_only_rewrites_old_tool_results_in_requests() {
    let model = model_info("model", BuiltinProvider::Anthropic);
    let long_output = "x".repeat(140);
    let provider = ScriptedProvider::new(
        BuiltinProvider::Anthropic,
        vec![model.clone()],
        vec![
            tool_use_stream(&model.id, "tool-1", "echo_tool", r#"{"value":"one"}"#),
            tool_use_stream(&model.id, "tool-2", "echo_tool", r#"{"value":"two"}"#),
            tool_use_stream(&model.id, "tool-3", "echo_tool", r#"{"value":"three"}"#),
            tool_use_stream(&model.id, "tool-4", "echo_tool", r#"{"value":"four"}"#),
            text_stream(&model.id, "done"),
        ],
    );
    let provider_handle = provider.clone();

    let runtime = Runtime::empty_builder()
        .with_provider_instance(provider)
        .with_tool(StaticTool::success("echo_tool", &long_output))
        .build()
        .expect("build runtime");
    let mut agent = runtime
        .spawn_with_config(
            "agent",
            model,
            AgentConfig {
                compaction: CompactionConfig {
                    keep_recent_tool_results: 2,
                    auto_compact_threshold_tokens: None,
                    ..Default::default()
                },
                ..Default::default()
            },
        )
        .unwrap();

    agent
        .send(vec![ContentBlock::Text {
            text: "hello".to_string(),
        }])
        .await
        .unwrap();

    assert_eq!(
        agent.history()[2],
        Message::user(ContentBlock::ToolResult {
            tool_use_id: "tool-1".to_string(),
            content: long_output.clone().into(),
            is_error: false,
        })
    );

    let requests = provider_handle.recorded_requests().await;
    assert_eq!(requests.len(), 5);
    let final_tool_results = tool_result_contents(&requests[4]);
    assert_eq!(
        final_tool_results,
        vec![
            "[Previous: used echo_tool]".to_string(),
            "[Previous: used echo_tool]".to_string(),
            long_output.clone(),
            long_output,
        ]
    );
}

#[tokio::test]
async fn auto_compaction_persists_transcript_and_rewrites_history() {
    let model = model_info("model", BuiltinProvider::Anthropic);
    let provider = ScriptedProvider::new(
        BuiltinProvider::Anthropic,
        vec![model.clone()],
        vec![
            text_stream(&model.id, "first done"),
            text_stream(&model.id, "summary"),
            text_stream(&model.id, "second done"),
        ],
    );
    let provider_handle = provider.clone();
    let transcript_dir = temp_dir("auto-compact");

    let runtime = Runtime::empty_builder()
        .with_provider_instance(provider)
        .build()
        .expect("build runtime");
    let mut agent = runtime
        .spawn_with_config(
            "agent",
            model,
            AgentConfig {
                compaction: CompactionConfig {
                    auto_compact_threshold_tokens: Some(1),
                    transcript_dir: transcript_dir.clone(),
                    ..Default::default()
                },
                ..Default::default()
            },
        )
        .unwrap();
    let mut events = agent.subscribe_events();

    agent
        .send(vec![ContentBlock::Text {
            text: "first".to_string(),
        }])
        .await
        .unwrap();
    agent
        .send(vec![ContentBlock::Text {
            text: "second".to_string(),
        }])
        .await
        .unwrap();

    assert_eq!(agent.history().len(), 4);
    assert_eq!(agent.history()[0].role, Role::User);
    assert_eq!(message_text(&agent.history()[0]), "first");
    assert!(message_text(&agent.history()[1]).contains("[Compaction summary]"));
    assert!(message_text(&agent.history()[1]).contains("Progress: summary"));

    let transcripts = fs::read_dir(&transcript_dir)
        .expect("read transcript dir")
        .map(|entry| entry.expect("read transcript entry").path())
        .collect::<Vec<_>>();
    assert_eq!(transcripts.len(), 1);

    let transcript = fs::read_to_string(&transcripts[0]).expect("read transcript");
    assert_eq!(transcript.lines().count(), 3);

    let requests = provider_handle.recorded_requests().await;
    assert_eq!(requests.len(), 3);
    assert!(requests[1].tools.is_empty());
    assert_eq!(requests[1].tool_choice, None);
    assert_eq!(message_text(&requests[2].messages[0]), "first");
    assert!(
        requests[2]
            .messages
            .iter()
            .any(|message| message_text(message).contains("Progress: summary"))
    );

    let compaction = collect_events(&mut events)
        .into_iter()
        .find_map(|event| match event {
            AgentEvent::ContextCompacted { details } => Some(details),
            _ => None,
        })
        .expect("expected compaction event");
    assert_eq!(compaction.trigger, CompactionTrigger::Auto);
    assert_eq!(compaction.replaced_items, 2);
    assert_eq!(compaction.preserved_items, 1);
    assert_eq!(compaction.preserved_user_turns, 1);
    assert_eq!(compaction.preserved_delegation_results, 0);
    assert_eq!(compaction.resulting_transcript_len, 3);
    assert!(compaction.transcript_path.starts_with(&transcript_dir));
}

#[tokio::test]
async fn compact_tool_compacts_history_and_continues() {
    let model = model_info("model", BuiltinProvider::Anthropic);
    let provider = ScriptedProvider::new(
        BuiltinProvider::Anthropic,
        vec![model.clone()],
        vec![
            tool_use_stream(&model.id, "compact-1", "compact", "{}"),
            text_stream(&model.id, "summary"),
            text_stream(&model.id, "after compact"),
        ],
    );
    let provider_handle = provider.clone();
    let transcript_dir = temp_dir("manual-compact");

    let runtime = Runtime::builder()
        .with_provider_instance(provider)
        .build()
        .expect("build runtime");
    let mut agent = runtime
        .spawn_with_config(
            "agent",
            model,
            AgentConfig {
                compaction: CompactionConfig {
                    auto_compact_threshold_tokens: None,
                    transcript_dir,
                    ..Default::default()
                },
                ..Default::default()
            },
        )
        .unwrap();
    let mut events = agent.subscribe_events();

    agent
        .send(vec![ContentBlock::Text {
            text: "please compact".to_string(),
        }])
        .await
        .unwrap();

    assert_eq!(agent.history().len(), 5);
    assert_eq!(message_text(&agent.history()[0]), "please compact");
    assert!(message_text(&agent.history()[1]).contains("[Compaction summary]"));
    assert!(message_text(&agent.history()[1]).contains("Progress: summary"));
    assert!(matches!(
        &agent.history()[3].content[0],
        ContentBlock::ToolResult { is_error: false, content, .. }
            if content.starts_with("Context compacted. Transcript saved to ")
    ));

    let requests = provider_handle.recorded_requests().await;
    assert_eq!(requests.len(), 3);
    assert!(requests[1].tools.is_empty());
    assert_eq!(requests[1].tool_choice, None);
    assert_eq!(message_text(&requests[2].messages[0]), "please compact");
    assert!(
        requests[2]
            .messages
            .iter()
            .any(|message| message_text(message).contains("Progress: summary"))
    );
    assert!(tool_names(&requests[0]).contains("compact"));

    let compaction = collect_events(&mut events)
        .into_iter()
        .find_map(|event| match event {
            AgentEvent::ContextCompacted { details } => Some(details),
            _ => None,
        })
        .expect("expected compaction event");
    assert_eq!(compaction.trigger, CompactionTrigger::Manual);
    assert_eq!(compaction.replaced_items, 1);
    assert_eq!(compaction.preserved_items, 1);
    assert_eq!(compaction.preserved_user_turns, 1);
    assert_eq!(compaction.preserved_delegation_results, 0);
    assert_eq!(compaction.resulting_transcript_len, 3);
}

fn text_stream(model: &str, text: &str) -> StreamScript {
    ok_stream(vec![
        ProviderEvent::MessageStarted {
            id: format!("msg-{text}"),
            model: model.to_string(),
            role: Role::Assistant,
        },
        ProviderEvent::ContentBlockStarted {
            index: 0,
            kind: ContentBlockStart::Text,
        },
        ProviderEvent::ContentBlockDelta {
            index: 0,
            delta: ContentBlockDelta::Text(text.to_string()),
        },
        ProviderEvent::ContentBlockStopped { index: 0 },
        ProviderEvent::MessageStopped,
    ])
}

fn tool_use_stream(model: &str, id: &str, name: &str, input_json: &str) -> StreamScript {
    ok_stream(vec![
        ProviderEvent::MessageStarted {
            id: format!("msg-{id}"),
            model: model.to_string(),
            role: Role::Assistant,
        },
        ProviderEvent::ContentBlockStarted {
            index: 0,
            kind: ContentBlockStart::ToolUse {
                id: id.to_string(),
                name: name.to_string(),
            },
        },
        ProviderEvent::ContentBlockDelta {
            index: 0,
            delta: ContentBlockDelta::ToolUseInputJson(input_json.to_string()),
        },
        ProviderEvent::ContentBlockStopped { index: 0 },
        ProviderEvent::MessageStopped,
    ])
}

fn tool_result_contents(request: &Request<'_>) -> Vec<String> {
    request
        .messages
        .iter()
        .flat_map(|message| message.content.iter())
        .filter_map(|block| match block {
            ContentBlock::ToolResult { content, .. } => Some(content.to_display_string()),
            _ => None,
        })
        .collect()
}

fn tool_names(request: &Request<'_>) -> std::collections::HashSet<String> {
    request.tools.iter().map(|tool| tool.name.clone()).collect()
}

fn message_text(message: &Message) -> &str {
    message
        .content
        .iter()
        .find_map(|block| match block {
            ContentBlock::Text { text } => Some(text.as_str()),
            _ => None,
        })
        .unwrap_or("")
}

fn collect_events(receiver: &mut tokio::sync::broadcast::Receiver<AgentEvent>) -> Vec<AgentEvent> {
    let mut events = Vec::new();
    while let Ok(event) = receiver.try_recv() {
        events.push(event);
    }
    events
}

static NEXT_TEMP_ID: AtomicU64 = AtomicU64::new(1);

fn temp_dir(label: &str) -> PathBuf {
    let unique = NEXT_TEMP_ID.fetch_add(1, Ordering::Relaxed);
    let timestamp = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("system time")
        .as_nanos();
    let path = std::env::temp_dir().join(format!(
        "mentra-runtime-compact-{label}-{timestamp}-{unique}"
    ));
    fs::create_dir_all(&path).expect("create temp dir");
    path
}