holon 0.14.1

A headless, event-driven runtime for long-lived agents
Documentation
use super::*;

use std::collections::BTreeMap;

use crate::types::{OperatorMessageRecord, OperatorMessageStatus};

const OPERATOR_MESSAGE_SCAN_MIN: usize = 256;
const OPERATOR_MESSAGE_SCAN_HEADROOM: usize = 16;

impl RuntimeHandle {
    pub async fn recent_briefs(&self, limit: usize) -> Result<Vec<BriefRecord>> {
        self.inner.storage.read_recent_briefs(limit)
    }

    pub async fn recent_operator_messages(
        &self,
        limit: usize,
    ) -> Result<Vec<OperatorMessageRecord>> {
        if limit == 0 {
            return Ok(Vec::new());
        }

        let state = {
            let guard = self.inner.agent.lock().await;
            guard.state.clone()
        };
        let scan_limit = limit
            .saturating_mul(OPERATOR_MESSAGE_SCAN_HEADROOM)
            .max(OPERATOR_MESSAGE_SCAN_MIN);
        let messages_by_id = self
            .inner
            .storage
            .read_recent_messages(scan_limit)?
            .into_iter()
            .filter(|message| message.kind == MessageKind::OperatorPrompt)
            .map(|message| (message.id.clone(), message))
            .collect::<BTreeMap<_, _>>();

        let mut latest_queue_entries = BTreeMap::new();
        for entry in self.inner.storage.read_recent_queue_entries(scan_limit)? {
            latest_queue_entries.insert(entry.message_id.clone(), entry);
        }

        let mut records = latest_queue_entries
            .values()
            .filter_map(|entry| {
                let message = messages_by_id.get(&entry.message_id)?;
                Some(OperatorMessageRecord {
                    message_id: entry.message_id.clone(),
                    agent_id: entry.agent_id.clone(),
                    status: operator_message_status(&entry.status, &entry.priority, &state),
                    created_at: entry.created_at,
                    updated_at: entry.updated_at,
                    body: message.body.clone(),
                    error: None,
                })
            })
            .collect::<Vec<_>>();
        records.sort_by(|left, right| {
            left.created_at
                .cmp(&right.created_at)
                .then_with(|| left.message_id.cmp(&right.message_id))
        });
        if records.len() > limit {
            records.drain(0..records.len() - limit);
        }
        Ok(records)
    }

    pub(super) async fn persist_brief(&self, brief: &BriefRecord) -> Result<()> {
        let mut bound_brief = brief.clone();
        {
            let guard = self.inner.agent.lock().await;
            bound_brief.workspace_id = guard
                .state
                .active_workspace_entry
                .as_ref()
                .map(|entry| entry.workspace_id.clone())
                .unwrap_or_else(|| crate::types::AGENT_HOME_WORKSPACE_ID.to_string());
            if bound_brief.work_item_id.is_none() {
                bound_brief.work_item_id = guard.state.current_turn_work_item_id.clone();
            }
        }
        self.inner.storage.append_brief(&bound_brief)?;
        self.inner.storage.append_event(&AuditEvent::new(
            "brief_created",
            to_json_value(&bound_brief),
        ))?;
        let mut guard = self.inner.agent.lock().await;
        guard.state.last_brief_at = Some(bound_brief.created_at);
        self.inner.storage.write_agent(&guard.state)?;
        Ok(())
    }
}

fn operator_message_status(
    status: &QueueEntryStatus,
    priority: &Priority,
    state: &AgentState,
) -> OperatorMessageStatus {
    match status {
        QueueEntryStatus::Queued
            if *priority == Priority::Interject && state.current_run_id.is_some() =>
        {
            OperatorMessageStatus::WaitingForSafePoint
        }
        QueueEntryStatus::Queued => OperatorMessageStatus::Queued,
        QueueEntryStatus::Dequeued | QueueEntryStatus::Interjected => {
            OperatorMessageStatus::Processing
        }
        QueueEntryStatus::Processed => OperatorMessageStatus::Processed,
        QueueEntryStatus::Aborted => OperatorMessageStatus::Failed,
        QueueEntryStatus::Dropped => OperatorMessageStatus::Dropped,
    }
}