aidaemon 0.11.9

A personal AI agent that runs as a background daemon, accessible via Telegram, Slack, or Discord, with tool use, MCP integration, and persistent memory
Documentation
use super::*;

pub(in crate::agent) async fn build_resume_checkpoint(
    agent: &Agent,
    session_id: &str,
) -> anyhow::Result<Option<ResumeCheckpoint>> {
    let Some(active_task_event) = agent.event_store.get_active_task(session_id).await? else {
        return Ok(None);
    };
    let Some(task_id) = active_task_event.task_id.clone() else {
        return Ok(None);
    };

    let start_data = active_task_event.parse_data::<TaskStartData>().ok();
    let description = start_data
        .as_ref()
        .map(|d| d.description.clone())
        .unwrap_or_else(|| "in-progress task".to_string());
    let original_user_message = start_data.and_then(|d| d.user_message);

    let events = agent
        .event_store
        .query_task_events_for_session(session_id, &task_id)
        .await
        .unwrap_or_default();

    let mut last_iteration: u32 = 0;
    let mut tool_results_count: u32 = 0;
    let mut pending_tool_calls: HashSet<String> = HashSet::new();
    let mut last_assistant_summary: Option<String> = None;
    let mut last_tool_summary: Option<String> = None;
    let mut last_error: Option<String> = None;
    let mut execution_snapshot: Option<ResumeExecutionSnapshot> = None;

    for event in &events {
        match event.event_type {
            EventType::ThinkingStart => {
                if let Ok(data) = event.parse_data::<ThinkingStartData>() {
                    last_iteration = last_iteration.max(data.iteration);
                }
            }
            EventType::AssistantResponse => {
                if let Ok(data) = event.parse_data::<AssistantResponseData>() {
                    if let Some(calls) = data.tool_calls.as_ref() {
                        for call in calls {
                            pending_tool_calls.insert(call.id.clone());
                        }
                    }
                    if let Some(content) = data.content.as_deref() {
                        let trimmed = content.trim();
                        if !trimmed.is_empty() {
                            last_assistant_summary = Some(truncate_for_resume(trimmed, 180));
                        }
                    }
                }
            }
            EventType::ToolResult => {
                if let Ok(data) = event.parse_data::<ToolResultData>() {
                    tool_results_count = tool_results_count.saturating_add(1);
                    pending_tool_calls.remove(&data.tool_call_id);
                    let detail = if data.success {
                        data.result
                    } else {
                        data.error.unwrap_or(data.result)
                    };
                    let detail = detail.trim();
                    if !detail.is_empty() {
                        last_tool_summary = Some(truncate_for_resume(detail, 180));
                    }
                }
            }
            EventType::Error => {
                if let Ok(data) = event.parse_data::<ErrorData>() {
                    last_error = Some(truncate_for_resume(&data.message, 180));
                }
            }
            EventType::DecisionPoint => {
                let Ok(data) = event.parse_data::<DecisionPointData>() else {
                    continue;
                };
                if data.decision_type != DecisionType::ExecutionStateSnapshot {
                    continue;
                }
                let Some(state) = data
                    .metadata
                    .get("execution_state")
                    .and_then(|value| value.as_object())
                else {
                    continue;
                };
                let execution_id = state
                    .get("execution_id")
                    .and_then(|value| value.as_str())
                    .map(str::to_string);
                let current_step = state
                    .get("current_step")
                    .and_then(|value| value.as_object());
                let last_outcome = state
                    .get("last_outcome")
                    .cloned()
                    .and_then(|value| serde_json::from_value::<StepExecutionOutcome>(value).ok());
                let background_handoff_active = state
                    .get("background_handoff_active")
                    .and_then(|value| value.as_bool())
                    .unwrap_or(false);
                let current_step_id = current_step
                    .and_then(|step| step.get("step_id"))
                    .and_then(|value| value.as_str())
                    .map(str::to_string);
                let current_tool = current_step
                    .and_then(|step| step.get("primary_tool"))
                    .and_then(|value| value.as_str())
                    .map(str::to_string);
                let current_target = current_step
                    .and_then(|step| step.get("expected_targets"))
                    .and_then(|value| value.as_array())
                    .and_then(|targets| targets.first())
                    .and_then(|target| target.get("value"))
                    .and_then(|value| value.as_str())
                    .map(str::to_string)
                    .or_else(|| {
                        current_step
                            .and_then(|step| step.get("target_scope"))
                            .and_then(|value| value.get("allowed_targets"))
                            .and_then(|value| value.as_array())
                            .and_then(|targets| targets.first())
                            .and_then(|target| target.get("value"))
                            .and_then(|value| value.as_str())
                            .map(str::to_string)
                    });
                let idempotency_key = current_step
                    .and_then(|step| step.get("idempotency_key"))
                    .and_then(|value| value.as_str())
                    .map(str::to_string);
                let Some(execution_id) = execution_id else {
                    continue;
                };
                execution_snapshot = Some(ResumeExecutionSnapshot {
                    execution_id,
                    current_step_id,
                    current_tool,
                    current_target,
                    last_outcome,
                    background_handoff_active,
                    idempotency_key,
                });
            }
            _ => {}
        }
    }

    let elapsed_secs = (Utc::now() - active_task_event.created_at)
        .num_seconds()
        .max(0) as u64;
    let mut pending_tool_call_ids: Vec<String> = pending_tool_calls.into_iter().collect();
    pending_tool_call_ids.sort();

    Ok(Some(ResumeCheckpoint {
        task_id,
        description,
        original_user_message,
        elapsed_secs,
        last_iteration,
        tool_results_count,
        pending_tool_call_ids,
        last_assistant_summary,
        last_tool_summary,
        last_error,
        execution_snapshot,
        // Carry the interrupted task's original turn_id (legacy = None) so the
        // recovery TaskEnd attributes to the correct turn, not the resume turn.
        turn_id: active_task_event.turn_id.clone(),
    }))
}

pub(in crate::agent) async fn mark_task_interrupted_for_resume(
    agent: &Agent,
    session_id: &str,
    checkpoint: &ResumeCheckpoint,
    resumed_task_id: &str,
) {
    // Best-effort: if task already has task_end, skip.
    let already_ended = agent
        .event_store
        .query_task_events_for_session(session_id, &checkpoint.task_id)
        .await
        .ok()
        .is_some_and(|events| events.iter().any(|e| e.event_type == EventType::TaskEnd));
    if already_ended {
        return;
    }

    let resume_emitter =
        crate::events::EventEmitter::new(agent.event_store.clone(), session_id.to_string())
            .with_task_id(checkpoint.task_id.clone());
    let error = format!(
        "Agent process interrupted before completion. Resumed in task {}.",
        resumed_task_id
    );
    let _ = resume_emitter
        .emit(
            EventType::TaskEnd,
            TaskEndData {
                task_id: checkpoint.task_id.clone(),
                status: TaskStatus::Failed,
                outcome: Some(crate::events::TaskOutcome::Failed),
                duration_secs: checkpoint.elapsed_secs,
                iterations: checkpoint.last_iteration,
                tool_calls_count: checkpoint.tool_results_count,
                error: Some(error),
                summary: Some("Recovered from checkpoint after interruption".to_string()),
                efficiency: None,
                // Recovery MUST use the checkpoint's original turn_id, never
                // current_turn_ids (which points at the new resume turn).
                // Legacy (TaskStart.turn_id == None) => None here.
                turn_id: checkpoint.turn_id.clone(),
                harness_eval: None,
            },
        )
        .await;
    if let Err(err) = super::dialogue_state::record_dialogue_task_end(
        agent,
        session_id,
        &checkpoint.task_id,
        TaskStatus::Failed,
    )
    .await
    {
        warn!(
            session_id,
            task_id = %checkpoint.task_id,
            error = %err,
            "Failed to record dialogue task end for stale checkpoint"
        );
    }
    agent
        .run_task_end_tool_hooks(&checkpoint.task_id, session_id)
        .await;
}

#[cfg(test)]
#[path = "resume_checkpoint_tests.rs"]
mod resume_checkpoint_tests;