mentra 0.6.0

An agent runtime for tool-using LLM applications
Documentation
use std::{
    path::{Path, PathBuf},
    sync::Arc,
    time::Instant,
};

use tokio::sync::{Mutex as AsyncMutex, mpsc};

use crate::{
    Agent, ContentBlock, agent::TeamAutonomyConfig, error::RuntimeError, runtime::CancellationToken,
};

use super::{TeamManager, TeamMemberStatus};

const TEAM_WAKE_PROMPT: &str = "Process any new team inbox messages and continue your work.";
const BACKGROUND_WAKE_PROMPT: &str =
    "Review any completed background task results and continue your work.";

pub(crate) async fn teammate_actor_loop(
    manager: TeamManager,
    team_dir: PathBuf,
    teammate_name: String,
    agent: Arc<AsyncMutex<Agent>>,
    mut wake_rx: mpsc::UnboundedReceiver<()>,
    cancellation: CancellationToken,
) {
    let autonomy = {
        let guard = agent.lock().await;
        guard.config().team.autonomy.clone()
    };
    let mut should_process = false;
    let mut idle_since = None;

    loop {
        if cancellation.is_cancelled() {
            break;
        }

        if should_process {
            match process_pending_work(&manager, &team_dir, &teammate_name, &agent).await {
                Ok(ActorState::Idle) => {
                    idle_since.get_or_insert_with(Instant::now);
                }
                Ok(ActorState::Shutdown) => break,
                Err(()) => {
                    idle_since.get_or_insert_with(Instant::now);
                }
            }
            should_process = false;
        }

        if cancellation.is_cancelled() {
            break;
        }

        if autonomy.enabled {
            let started_idle_at = idle_since.unwrap_or_else(|| {
                let now = Instant::now();
                idle_since = Some(now);
                now
            });

            let wait = tokio::time::sleep(autonomy.poll_interval);
            tokio::pin!(wait);

            tokio::select! {
                wake = wake_rx.recv() => {
                    match wake {
                        Some(()) => {
                            should_process = true;
                            idle_since = None;
                        }
                        None => break,
                    }
                }
                _ = &mut wait => {
                    match autonomy_tick(
                        &manager,
                        &team_dir,
                        &teammate_name,
                        &agent,
                        started_idle_at,
                        &autonomy,
                    ).await {
                        Ok(AutonomyState::ContinueIdle) => {}
                        Ok(AutonomyState::Claimed(prompt)) => {
                            if execute_prompt(&manager, &team_dir, &teammate_name, &agent, prompt)
                                .await
                                .is_err()
                            {
                                idle_since.get_or_insert_with(Instant::now);
                                continue;
                            }
                            let _ = manager.update_member_status(
                                &team_dir,
                                &teammate_name,
                                TeamMemberStatus::Idle,
                            );
                            should_process = true;
                            idle_since = Some(Instant::now());
                        }
                        Ok(AutonomyState::Shutdown) => break,
                        Err(()) => {
                            idle_since.get_or_insert_with(Instant::now);
                        }
                    }
                }
            }
        } else {
            match wake_rx.recv().await {
                Some(()) => {
                    should_process = true;
                    idle_since = None;
                }
                None => break,
            }
        }
    }

    let _ = manager.unregister_teammate_actor(&team_dir, &teammate_name);
}

enum ActorState {
    Idle,
    Shutdown,
}

enum PendingWork {
    Prompt(String),
    Idle,
    Shutdown,
}

enum AutonomyState {
    ContinueIdle,
    Claimed(String),
    Shutdown,
}

async fn process_pending_work(
    manager: &TeamManager,
    team_dir: &Path,
    teammate_name: &str,
    agent: &Arc<AsyncMutex<Agent>>,
) -> Result<ActorState, ()> {
    let mut processed_prompt = false;

    loop {
        match next_pending_work(manager, team_dir, teammate_name, agent).await {
            Ok(PendingWork::Prompt(prompt)) => {
                processed_prompt = true;
                execute_prompt(manager, team_dir, teammate_name, agent, prompt).await?
            }
            Ok(PendingWork::Idle) => {
                if processed_prompt {
                    let _ = manager.update_member_status(
                        team_dir,
                        teammate_name,
                        TeamMemberStatus::Idle,
                    );
                }
                return Ok(ActorState::Idle);
            }
            Ok(PendingWork::Shutdown) => {
                let _ = manager.update_member_status(
                    team_dir,
                    teammate_name,
                    TeamMemberStatus::Shutdown,
                );
                return Ok(ActorState::Shutdown);
            }
            Err(error) => {
                let _ = mark_failed(manager, team_dir, teammate_name, error);
                return Err(());
            }
        }
    }
}

async fn next_pending_work(
    manager: &TeamManager,
    team_dir: &Path,
    teammate_name: &str,
    agent: &Arc<AsyncMutex<Agent>>,
) -> Result<PendingWork, RuntimeError> {
    if manager.has_pending_messages(team_dir, teammate_name)? {
        return Ok(PendingWork::Prompt(TEAM_WAKE_PROMPT.to_string()));
    }

    let has_background_notifications = {
        let guard = agent.lock().await;
        guard
            .runtime_handle()
            .has_deliverable_background_notifications(guard.id())
    };
    if has_background_notifications {
        return Ok(PendingWork::Prompt(BACKGROUND_WAKE_PROMPT.to_string()));
    }

    if manager.take_shutdown_signal(team_dir, teammate_name)? {
        return Ok(PendingWork::Shutdown);
    }

    Ok(PendingWork::Idle)
}

async fn autonomy_tick(
    manager: &TeamManager,
    team_dir: &Path,
    teammate_name: &str,
    agent: &Arc<AsyncMutex<Agent>>,
    idle_since: Instant,
    autonomy: &TeamAutonomyConfig,
) -> Result<AutonomyState, ()> {
    match manager.take_shutdown_signal(team_dir, teammate_name) {
        Ok(true) => {
            let _ =
                manager.update_member_status(team_dir, teammate_name, TeamMemberStatus::Shutdown);
            return Ok(AutonomyState::Shutdown);
        }
        Ok(false) => {}
        Err(error) => {
            let _ = mark_failed(manager, team_dir, teammate_name, error);
            return Err(());
        }
    }

    let claimed = {
        let mut guard = agent.lock().await;
        match guard.try_claim_ready_task() {
            Ok(task) => task,
            Err(error) => {
                let _ = mark_failed(manager, team_dir, teammate_name, error);
                return Err(());
            }
        }
    };
    if let Some(task) = claimed {
        let task_body = if task.description.trim().is_empty() {
            format!("Task #{}: {}", task.id, task.subject)
        } else {
            format!(
                "Task #{}: {}\nDescription: {}",
                task.id, task.subject, task.description
            )
        };
        return Ok(AutonomyState::Claimed(format!(
            "<auto-claimed>{task_body}</auto-claimed>\n<reminder>Update your task status. Mark it in_progress when you start and completed when you finish.</reminder>"
        )));
    }

    if idle_since.elapsed() >= autonomy.idle_timeout {
        let _ = manager.update_member_status(team_dir, teammate_name, TeamMemberStatus::Shutdown);
        return Ok(AutonomyState::Shutdown);
    }

    Ok(AutonomyState::ContinueIdle)
}

async fn execute_prompt(
    manager: &TeamManager,
    team_dir: &Path,
    teammate_name: &str,
    agent: &Arc<AsyncMutex<Agent>>,
    prompt: String,
) -> Result<(), ()> {
    let _ = manager.update_member_status(team_dir, teammate_name, TeamMemberStatus::Working);
    let result = {
        let mut guard = agent.lock().await;
        guard.send(vec![ContentBlock::Text { text: prompt }]).await
    };

    match result {
        Ok(_) | Err(RuntimeError::EmptyAssistantResponse) => Ok(()),
        Err(error) => {
            let _ = mark_failed(manager, team_dir, teammate_name, error);
            Err(())
        }
    }
}

fn mark_failed(
    manager: &TeamManager,
    team_dir: &Path,
    teammate_name: &str,
    error: RuntimeError,
) -> Result<(), RuntimeError> {
    manager.update_member_status(
        team_dir,
        teammate_name,
        TeamMemberStatus::Failed(error.to_string()),
    )
}