omk 0.5.0

A Rust runtime for Kimi CLI. Turns prompts into proof-backed engineering runs with gates, worktrees, and replay.
Documentation
use std::sync::Arc;
use std::time::{Duration, Instant};

use omk::runtime::classifier::Intent;
use omk::runtime::conversation::bus::{BusEvent, PreflightAction, PreflightKind};
use omk::runtime::conversation::outcome::RouteOutcome;
use omk::runtime::escalation::{
    backends::{MediumPlanResult, SmallEditResult},
    mocks::{MockClassifier, MockGoalBridge, MockLlmDirect, MockWireWorker},
    router::{Router, RouterConfig},
};

use crate::common::{make_classifier_output, make_handle, make_session};

#[tokio::test]
async fn test_concurrency_cap_blocks_fourth_medium_goal() {
    let config = RouterConfig {
        interactive_preflight: true,
        medium_goal_cap: 3,
        ..Default::default()
    };
    let bus = omk::runtime::conversation::bus::EventBus::new().arc();
    let mut rx = bus.subscribe();

    let (tx1, rx1) = tokio::sync::oneshot::channel();
    let (tx2, rx2) = tokio::sync::oneshot::channel();
    let (tx3, rx3) = tokio::sync::oneshot::channel();

    let wire = MockWireWorker::new(
        SmallEditResult {
            worker_id: "sw1".into(),
            files_touched: 0,
            diff_summary: "".into(),
        },
        MediumPlanResult {
            workers: vec!["mw".into()],
            steps_completed: 1,
            steps_failed: 0,
        },
    );
    wire.push_medium_block(rx1).await;
    wire.push_medium_block(rx2).await;
    wire.push_medium_block(rx3).await;

    let router = Arc::new(Router::new(
        Arc::new(MockClassifier::new(make_classifier_output(
            Intent::Medium,
            0.88,
        ))),
        Arc::new(MockLlmDirect::new(0)),
        Arc::new(wire),
        Arc::new(MockGoalBridge::new(make_handle("g1"))),
        config,
        bus.clone(),
    ));

    let session = make_session();

    for _ in 0..3 {
        let r = router.clone();
        let s = session.clone();
        tokio::spawn(async move {
            let _ = r.dispatch("do work", &s).await;
        });
    }

    let deadline = Instant::now() + Duration::from_secs(2);
    while Instant::now() < deadline {
        if session.active_medium_goals.lock().await.len() == 3 {
            break;
        }
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
    assert_eq!(session.active_medium_goals.lock().await.len(), 3);

    let r4 = router.clone();
    let handle4 = tokio::spawn(async move { r4.dispatch("do more work", &session).await });

    let mut ticket_id = None;
    while let Ok(ev) = rx.recv().await {
        if let BusEvent::PreflightRequest(ref p) = ev {
            if matches!(p.kind, PreflightKind::QueueMediumAtConcurrencyCap) {
                ticket_id = Some(p.ticket_id.clone());
                break;
            }
        }
    }
    assert!(
        ticket_id.is_some(),
        "expected queue preflight for 4th medium"
    );

    router
        .submit_preflight(ticket_id.unwrap(), PreflightAction::Cancel)
        .await;
    let outcome = handle4.await.unwrap();
    assert!(matches!(outcome, RouteOutcome::Cancelled));

    let _ = tx1.send(());
    let _ = tx2.send(());
    let _ = tx3.send(());
}