omk 0.5.0

A Rust runtime for Kimi CLI. Turns prompts into proof-backed engineering runs with gates, worktrees, and replay.
Documentation
use anyhow::Result;
use std::path::Path;

use crate::runtime::config::WORKERS_DIR;
use crate::runtime::events::{Event, EventBuilder, EventKind, EventWriter, RunId, WorkerId};
use crate::runtime::scheduler::decompose::{Subtask, SynthesisAgent};
use crate::runtime::state::TeamState;
use crate::runtime::wire_worker::WireWorkerAdapter;
use crate::runtime::worker::{WorkerResult, WorkerSpec};

pub(super) struct KimiRunMetadata {
    pub(super) binary: String,
    pub(super) cli_version: Option<String>,
    pub(super) wire_protocol_version: String,
}

pub(super) async fn detect_kimi_run_metadata(kimi_bin: &str) -> KimiRunMetadata {
    let cli_version = command_first_line(kimi_bin, &["--version"]).await;
    let wire_protocol_version = command_output(kimi_bin, &["info"])
        .await
        .and_then(|info| parse_wire_protocol_version(&info))
        .unwrap_or_else(|| crate::wire::protocol::KIMI_WIRE_PROTOCOL_VERSION.to_string());

    KimiRunMetadata {
        binary: kimi_bin.to_string(),
        cli_version,
        wire_protocol_version,
    }
}

pub(super) fn fallback_subtasks(task: &str, count: usize) -> Vec<Subtask> {
    (0..count)
        .map(|i| Subtask {
            id: format!("task-{}", i + 1),
            description: format!("{} — worker-{} focus", task, i),
            read_set: Vec::new(),
            write_set: Vec::new(),
        })
        .collect()
}

async fn command_first_line(binary: &str, args: &[&str]) -> Option<String> {
    command_output(binary, args)
        .await
        .map(|text| text.lines().next().unwrap_or(&text).trim().to_string())
}

async fn command_output(binary: &str, args: &[&str]) -> Option<String> {
    let output = tokio::time::timeout(
        std::time::Duration::from_secs(2),
        tokio::process::Command::new(binary).args(args).output(),
    )
    .await
    .ok()?
    .ok()?;

    if !output.status.success() {
        return None;
    }

    let stdout = String::from_utf8_lossy(&output.stdout);
    let stderr = String::from_utf8_lossy(&output.stderr);
    let text = if stdout.trim().is_empty() {
        stderr.trim()
    } else {
        stdout.trim()
    };
    (!text.is_empty()).then(|| text.to_string())
}

fn parse_wire_protocol_version(info_output: &str) -> Option<String> {
    for line in info_output.lines() {
        let lower = line.to_ascii_lowercase();
        if lower.contains("wire protocol") {
            return line
                .split([':', '='])
                .nth(1)
                .map(str::trim)
                .filter(|value| !value.is_empty())
                .map(str::to_string);
        }
    }
    None
}

pub(super) async fn synthesize_results(
    worker_specs: &[WorkerSpec],
    state_dir: &Path,
    event_writer: &EventWriter,
    run_id: &RunId,
    kimi_bin: &str,
) -> Result<String> {
    let mut worker_results = Vec::new();
    for spec in worker_specs {
        if !spec.outbox.exists() {
            continue;
        }
        let content = tokio::fs::read_to_string(&spec.outbox).await?;
        for line in content.lines() {
            if line.trim().is_empty() {
                continue;
            }
            if let Ok(result) = serde_json::from_str::<WorkerResult>(line) {
                worker_results.push(format!(
                    "{} ({}): {}",
                    spec.name, result.task_id, result.summary
                ));
            }
        }
    }

    if worker_results.is_empty() {
        return Ok("No worker results available.".to_string());
    }

    let results_text = worker_results.join("\n");
    let prompt = format!(
        "You are a synthesis agent. The following subtasks were completed by a team of workers:\n{}\n\nSynthesize a concise final summary (2-3 sentences) of what was accomplished.",
        results_text
    );

    let synthesis = SynthesisAgent::synthesize(&prompt, kimi_bin).await?;

    let synthesis_path = state_dir.join("synthesis.txt");
    tokio::fs::write(&synthesis_path, &synthesis).await?;

    let event = Event::new(run_id.clone(), EventKind::TaskCompleted)
        .with_actor("synthesis-agent")
        .with_payload(serde_json::json!({
            "task_id": "synthesis",
            "summary": &synthesis,
        }))?;
    event_writer.append(&event).await?;

    Ok(synthesis)
}

pub(super) struct WireWorkerSetup<'a> {
    pub(super) team_name: &'a str,
    pub(super) task: &'a str,
    pub(super) count: usize,
    pub(super) role: &'a str,
    pub(super) state_dir: &'a Path,
    pub(super) dir: &'a Path,
    pub(super) event_writer: &'a EventWriter,
    pub(super) run_id: &'a RunId,
    pub(super) cancel_token: tokio_util::sync::CancellationToken,
}

pub(super) async fn setup_wire_workers(
    config: WireWorkerSetup<'_>,
) -> Result<(Vec<WorkerSpec>, Vec<tokio::task::JoinHandle<()>>)> {
    let state = TeamState::new(
        config.team_name,
        config.task,
        config.state_dir,
        config.count,
        config.role,
    );
    state.save().await?;

    let mcp_bridge = crate::mcp::bridge::maybe_create_bridge().await;

    let mut worker_specs = Vec::new();
    let mut handles = Vec::new();

    for i in 0..config.count {
        let worker_name = format!("worker-{i}");
        let worker_dir = config.state_dir.join(WORKERS_DIR).join(&worker_name);
        tokio::fs::create_dir_all(&worker_dir).await?;

        let worker_spec = WorkerSpec {
            name: worker_name.clone(),
            role: config.role.to_string(),
            inbox: worker_dir.join("inbox.jsonl"),
            outbox: worker_dir.join("outbox.jsonl"),
            heartbeat: worker_dir.join("heartbeat.json"),
            project_dir: Some(config.dir.to_path_buf()),
            external_tools: None,
            approval_policy: crate::runtime::wire_worker::ApprovalPolicy::default(),
            approval_timeout_secs: crate::runtime::worker::default_approval_timeout_secs(),
        };
        worker_spec.save().await?;
        worker_specs.push(worker_spec.clone());

        let adapter = WireWorkerAdapter::new_with_cancel(
            worker_spec,
            config.run_id.clone(),
            config.event_writer.clone(),
            config.cancel_token.clone(),
        )
        .with_mcp_bridge(mcp_bridge.clone());
        let handle = adapter.spawn();
        handles.push(handle);

        let worker_started = EventBuilder::new(config.run_id.clone())
            .worker_started(WorkerId(worker_name.clone()), config.role)?;
        config.event_writer.append(&worker_started).await?;
    }

    Ok((worker_specs, handles))
}