omk 0.5.0

A Rust runtime for Kimi CLI. Turns prompts into proof-backed engineering runs with gates, worktrees, and replay.
Documentation
use anyhow::Result;
use std::collections::HashMap;
use std::path::Path;

use crate::runtime::events::{Event, EventBuilder, EventKind, EventWriter, RunId};
use crate::runtime::scheduler::claim::ClaimStore;
use crate::runtime::scheduler::manifest::RunManifest;
use crate::runtime::scheduler::ownership::OwnershipMap;
use crate::runtime::scheduler::runner::{RunSummary, TeamRunner};
use crate::runtime::scheduler::task::{Task, TaskState};
use crate::runtime::worker::WorkerSpec;
use chrono::Utc;
use tokio_util::sync::CancellationToken;

const STALE_WORKER_CLEANUP_FILE: &str = "stale-worker-cleanup.json";

impl TeamRunner {
    /// Initialize a new team run.
    pub async fn init(
        run_id: &str,
        task_desc: &str,
        project_dir: &Path,
        state_dir: &Path,
        event_writer: EventWriter,
    ) -> Result<Self> {
        let manifest = RunManifest::new(run_id, "team", project_dir).with_description(task_desc);
        manifest.init().await?;

        Ok(Self {
            manifest,
            claim_store: ClaimStore::new(),
            ownership: OwnershipMap::new(),
            event_writer,
            state_dir: state_dir.to_path_buf(),
            run_id: RunId(run_id.to_string()),
            last_outbox_offsets: HashMap::new(),
            last_heartbeat_ts: HashMap::new(),
            stale_task_owners: HashMap::new(),
            dead_workers: Default::default(),
        })
    }

    /// Initialize a new team run with a pre-built task list.
    pub async fn init_with_tasks(
        run_id: &str,
        project_dir: &Path,
        state_dir: &Path,
        event_writer: EventWriter,
        tasks: Vec<Task>,
    ) -> Result<Self> {
        let manifest = RunManifest::new(run_id, "team", project_dir).with_tasks(tasks.clone());
        manifest.init().await?;

        let mut claim_store = ClaimStore::new();
        for task in &tasks {
            claim_store.insert(task.clone());
        }

        Ok(Self {
            manifest,
            claim_store,
            ownership: OwnershipMap::new(),
            event_writer,
            state_dir: state_dir.to_path_buf(),
            run_id: RunId(run_id.to_string()),
            last_outbox_offsets: HashMap::new(),
            last_heartbeat_ts: HashMap::new(),
            stale_task_owners: HashMap::new(),
            dead_workers: Default::default(),
        })
    }

    pub(crate) fn set_lease_seconds(&mut self, secs: i64) {
        self.claim_store.set_lease_seconds(secs);
    }

    /// Run the main loop until all tasks are done.
    pub async fn run(&mut self, worker_specs: &[WorkerSpec]) -> Result<RunSummary> {
        let cancel = CancellationToken::new();
        self.run_with_cancel(worker_specs, &cancel).await
    }

    pub(crate) async fn run_with_cancel(
        &mut self,
        worker_specs: &[WorkerSpec],
        cancel: &CancellationToken,
    ) -> Result<RunSummary> {
        self.run_with_cancel_reason(worker_specs, cancel, "controller interrupt")
            .await
    }

    pub(crate) async fn run_with_cancel_reason(
        &mut self,
        worker_specs: &[WorkerSpec],
        cancel: &CancellationToken,
        cancel_reason: &str,
    ) -> Result<RunSummary> {
        loop {
            if cancel.is_cancelled() {
                self.cancel_unfinished_tasks(cancel_reason).await?;
                self.snapshot().await?;
                break;
            }

            self.dispatch_to_workers(worker_specs).await?;
            self.poll_workers().await?;

            if cancel.is_cancelled() {
                self.cancel_unfinished_tasks(cancel_reason).await?;
                self.snapshot().await?;
                break;
            }

            self.recover_stale_leases().await?;
            self.fail_unfinished_tasks_if_no_live_workers(worker_specs)
                .await?;

            self.snapshot().await?;

            if self.claim_store.all_done() {
                break;
            }

            tokio::select! {
                biased;
                _ = cancel.cancelled() => {}
                _ = tokio::time::sleep(std::time::Duration::from_secs(
                    crate::runtime::scheduler::runner::RUNNER_POLL_INTERVAL_SECS,
                )) => {}
            }
        }

        let summary = self.claim_store.summary();
        let success = summary.failed == 0 && summary.cancelled == 0;

        if success {
            let event = EventBuilder::new(self.run_id.clone()).run_completed();
            self.event_writer.append(&event).await?;
        } else {
            let event = EventBuilder::new(self.run_id.clone())
                .run_failed("one or more tasks failed or were cancelled")?;
            self.event_writer.append(&event).await?;
        }

        Ok(RunSummary {
            run_id: self.run_id.0.clone(),
            completed: summary.completed,
            failed: summary.failed,
            cancelled: summary.cancelled,
            total: summary.total(),
        })
    }

    async fn cancel_unfinished_tasks(&mut self, reason: &str) -> Result<()> {
        let task_ids: Vec<String> = self
            .claim_store
            .tasks()
            .values()
            .filter(|task| !task.state.is_terminal())
            .map(|task| task.id.clone())
            .collect();

        for task_id in task_ids {
            let worker_id = self
                .claim_store
                .get(&task_id)
                .and_then(|task| task.owner.clone());
            if self.claim_store.cancel(&task_id)? {
                if let Some(task) = self.claim_store.get(&task_id) {
                    self.ownership.release_task(task);
                }
                let event = Event::new(self.run_id.clone(), EventKind::TaskFailed)
                    .with_actor("scheduler")
                    .with_payload(serde_json::json!({
                        "task_id": task_id,
                        "worker_id": worker_id,
                        "error": reason,
                    }))?;
                self.event_writer.append(&event).await?;
            }
        }

        Ok(())
    }

    /// Save current state to manifest.
    pub async fn snapshot(&self) -> Result<()> {
        let mut manifest = self.manifest.clone();
        manifest.tasks = self.claim_store.tasks().values().cloned().collect();
        manifest.save().await?;
        manifest.snapshot_tasks().await?;
        Ok(())
    }

    pub(crate) async fn recover_stale_leases(&mut self) -> Result<()> {
        let recovered = self.claim_store.recover_stale_leases_with_owners();
        for recovery in &recovered {
            if let Some(task) = self.claim_store.get(&recovery.task_id) {
                self.ownership.release_task(task);
            }
            if let Some(stale_owner) = recovery.stale_owner.as_deref() {
                self.stale_task_owners
                    .insert(recovery.task_id.clone(), stale_owner.to_string());
                self.quarantine_stale_worker(stale_owner, &recovery.task_id)
                    .await?;
            }
            let event = Event::new(self.run_id.clone(), EventKind::RetryScheduled)
                .with_actor("scheduler")
                .with_payload(serde_json::json!({
                    "task_id": recovery.task_id,
                    "reason": "stale lease recovered",
                    "stale_worker_id": recovery.stale_owner,
                }))?;
            self.event_writer.append(&event).await?;
        }

        Ok(())
    }

    async fn quarantine_stale_worker(&mut self, worker_id: &str, task_id: &str) -> Result<()> {
        if !self.dead_workers.insert(worker_id.to_string()) {
            return Ok(());
        }

        let cleaned_at = Utc::now();
        let marker = serde_json::json!({
            "worker_id": worker_id,
            "task_id": task_id,
            "reason": "stale lease recovered",
            "action": "quarantined",
            "cleaned_at": cleaned_at,
        });
        let worker_dir = self
            .state_dir
            .join(crate::runtime::config::WORKERS_DIR)
            .join(worker_id);
        crate::runtime::config::ensure_private_dir(&worker_dir).await?;
        tokio::fs::write(
            worker_dir.join(STALE_WORKER_CLEANUP_FILE),
            serde_json::to_string_pretty(&marker)?,
        )
        .await?;

        let event = Event::new(self.run_id.clone(), EventKind::WorkerDead)
            .with_actor("scheduler")
            .with_payload(serde_json::json!({
                "worker_id": worker_id,
                "task_id": task_id,
                "reason": "stale lease recovered",
                "cleanup_marker": format!("{}/{}/{}", crate::runtime::config::WORKERS_DIR, worker_id, STALE_WORKER_CLEANUP_FILE),
                "action": "quarantined",
            }))?;
        self.event_writer.append(&event).await
    }

    async fn fail_unfinished_tasks_if_no_live_workers(
        &mut self,
        worker_specs: &[WorkerSpec],
    ) -> Result<()> {
        let has_live_worker = worker_specs
            .iter()
            .any(|worker| !self.dead_workers.contains(&worker.name));
        if has_live_worker {
            return Ok(());
        }

        let reason = "no live workers available after stale cleanup";
        let task_ids: Vec<String> = self
            .claim_store
            .tasks()
            .values()
            .filter(|task| !task.state.is_terminal())
            .map(|task| task.id.clone())
            .collect();

        for task_id in task_ids {
            let previous_worker = self
                .claim_store
                .get(&task_id)
                .and_then(|task| task.owner.clone());

            if let Some(task) = self.claim_store.get(&task_id) {
                self.ownership.release_task(task);
            }

            let Some(task) = self.claim_store.tasks_mut().get_mut(&task_id) else {
                continue;
            };
            task.state = TaskState::Failed;
            task.owner = None;
            task.lease_expires = None;
            task.completed_at = Some(Utc::now());
            task.started_at = None;

            let event = Event::new(self.run_id.clone(), EventKind::TaskFailed)
                .with_actor("scheduler")
                .with_payload(serde_json::json!({
                    "task_id": task_id,
                    "worker_id": previous_worker,
                    "error": reason,
                }))?;
            self.event_writer.append(&event).await?;
        }

        Ok(())
    }
}