car-inference 0.15.0

Local model inference for CAR — Candle backend with Qwen3 models
Documentation
//! Foreign-implemented inference runner.
//!
//! Closes [Parslee-ai/car-releases#24]. The runner pattern lets a host
//! (Node.js, Python, Swift, Kotlin, or a remote JSON-RPC client) own
//! the wire format for cloud chat APIs while CAR stays in the
//! lifecycle path — observing every event, applying policy, recording
//! to the eventlog, supporting replay.
//!
//! When a model schema declares `source: ModelSource::Delegated { .. }`,
//! [`InferenceEngine::generate_tracked_stream`] checks the
//! process-wide runner slot ([`set_inference_runner`]); if a runner
//! is installed, the request is handed to it and its emitted events
//! flow through the stream's `Receiver<StreamEvent>` exactly as if a
//! native backend had produced them.
//!
//! The two-direction contract:
//! - Rust → host: `run(request)` is invoked with a fully-formed
//!   [`GenerateRequest`]. The trait is async so the host can do its
//!   own HTTP / SDK work. The host receives an event emitter it
//!   should call as chunks arrive.
//! - host → Rust: every chunk becomes a `StreamEvent` via the
//!   provided emitter. Final result returned as the trait method's
//!   return value (text + tool_calls).
//!
//! Only one runner can be registered at a time (mirrors
//! [`car_multi::AgentRunner`]'s singleton constraint). Re-registering
//! overwrites the slot.
//!
//! [Parslee-ai/car-releases#24]: https://github.com/Parslee-ai/car-releases/issues/24

use std::sync::{Arc, OnceLock, RwLock};

use serde::{Deserialize, Serialize};

use crate::stream::StreamEvent;
use crate::tasks::generate::GenerateRequest;
use crate::InferenceError;

/// Final result returned by the runner once the stream is complete.
/// Mirrors what [`crate::StreamAccumulator::finish`] yields for native
/// backends — runners are expected to aggregate as they emit so the
/// returned text matches the concatenation of `TextDelta`s.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RunnerResult {
    pub text: String,
    #[serde(default)]
    pub tool_calls: Vec<crate::tasks::generate::ToolCall>,
}

/// Errors a runner can surface back to the engine. Kept as a string
/// payload so foreign hosts (NAPI, PyO3, UniFFI) can populate it
/// without sharing concrete error types.
#[derive(Debug, Clone, thiserror::Error)]
pub enum RunnerError {
    /// The runner declined the request (e.g., the schema's `hint`
    /// didn't match any provider it knows about).
    #[error("runner declined: {0}")]
    Declined(String),
    /// The runner attempted the request but failed at the wire layer
    /// (HTTP error, auth failure, provider rate limit, etc.).
    #[error("runner failed: {0}")]
    Failed(String),
}

impl From<RunnerError> for InferenceError {
    fn from(value: RunnerError) -> Self {
        InferenceError::InferenceFailed(value.to_string())
    }
}

/// Sink the runner uses to emit stream events as they arrive from the
/// upstream provider. The runner calls `emit(event)` for every chunk;
/// CAR receives it through the [`StreamEvent`] channel returned from
/// [`crate::InferenceEngine::generate_tracked_stream`].
///
/// Cloning shares the underlying channel — a runner that fans out
/// (e.g., separate workers per tool call) can clone the emitter once
/// per worker.
#[derive(Clone)]
pub struct EventEmitter {
    tx: tokio::sync::mpsc::Sender<StreamEvent>,
}

impl EventEmitter {
    pub(crate) fn new(tx: tokio::sync::mpsc::Sender<StreamEvent>) -> Self {
        Self { tx }
    }

    /// Emit one event. Best-effort: if the receiver was dropped
    /// (caller stopped listening), the event is silently discarded.
    /// The runner can detect this via [`Self::is_closed`] and bail
    /// early if it wants to stop the upstream call.
    pub async fn emit(&self, event: StreamEvent) {
        let _ = self.tx.send(event).await;
    }

    /// Has the receiver been dropped? Runners that pump events
    /// indefinitely should poll this to stop on consumer cancellation.
    pub fn is_closed(&self) -> bool {
        self.tx.is_closed()
    }
}

/// The trait foreign hosts implement to handle delegated inference.
///
/// Implementations are typically thin shims around a host-owned SDK
/// (Vercel AI SDK on Node, Anthropic's Python client, etc.). They
/// translate the supplied [`GenerateRequest`] to a wire-format call,
/// stream chunks back through the [`EventEmitter`], and return a
/// [`RunnerResult`] with the aggregated final text.
#[async_trait::async_trait]
pub trait InferenceRunner: Send + Sync {
    /// Drive the request to completion. The runner MUST emit at
    /// least a final [`StreamEvent::Done`] event before returning,
    /// for parity with native backends — consumers of the resulting
    /// stream rely on `Done` as the terminal marker.
    async fn run(
        &self,
        request: GenerateRequest,
        emitter: EventEmitter,
    ) -> Result<RunnerResult, RunnerError>;
}

fn runner_slot() -> &'static RwLock<Option<Arc<dyn InferenceRunner>>> {
    static SLOT: OnceLock<RwLock<Option<Arc<dyn InferenceRunner>>>> = OnceLock::new();
    SLOT.get_or_init(|| RwLock::new(None))
}

/// Install a process-wide inference runner. Re-registering overwrites
/// any previous runner. Pass `None` to clear the slot (rarely useful
/// in production, but handy in tests).
pub fn set_inference_runner(runner: Option<Arc<dyn InferenceRunner>>) {
    let mut guard = runner_slot()
        .write()
        .expect("inference runner slot poisoned");
    *guard = runner;
}

/// Snapshot the currently registered runner (if any). Cheap clone of
/// the `Arc`; safe to call from any thread.
pub fn current_inference_runner() -> Option<Arc<dyn InferenceRunner>> {
    runner_slot()
        .read()
        .expect("inference runner slot poisoned")
        .clone()
}

#[cfg(test)]
mod tests {
    use super::*;

    struct EchoRunner;

    #[async_trait::async_trait]
    impl InferenceRunner for EchoRunner {
        async fn run(
            &self,
            request: GenerateRequest,
            emitter: EventEmitter,
        ) -> Result<RunnerResult, RunnerError> {
            let text = format!("echo:{}", request.prompt);
            emitter.emit(StreamEvent::TextDelta(text.clone())).await;
            emitter
                .emit(StreamEvent::Done {
                    text: text.clone(),
                    tool_calls: vec![],
                })
                .await;
            Ok(RunnerResult {
                text,
                tool_calls: vec![],
            })
        }
    }

    #[test]
    fn slot_round_trips() {
        // Use a fresh slot view: even though it's process-wide, the
        // OnceLock stays initialised across tests in the same binary.
        // Setting None first guarantees a clean state for this test.
        set_inference_runner(None);
        assert!(current_inference_runner().is_none());
        set_inference_runner(Some(Arc::new(EchoRunner)));
        assert!(current_inference_runner().is_some());
        set_inference_runner(None);
        assert!(current_inference_runner().is_none());
    }

    #[tokio::test]
    async fn runner_can_emit_then_finish() {
        let runner: Arc<dyn InferenceRunner> = Arc::new(EchoRunner);
        let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamEvent>(8);
        let emitter = EventEmitter::new(tx);
        let request = GenerateRequest {
            prompt: "hi".into(),
            ..Default::default()
        };
        let result = runner.run(request, emitter).await.unwrap();
        assert_eq!(result.text, "echo:hi");
        // Drain emitted events.
        let mut got = Vec::new();
        while let Ok(evt) =
            tokio::time::timeout(std::time::Duration::from_millis(20), rx.recv()).await
        {
            match evt {
                Some(e) => got.push(e),
                None => break,
            }
        }
        assert_eq!(got.len(), 2);
        matches!(got[0], StreamEvent::TextDelta(_));
        matches!(got[1], StreamEvent::Done { .. });
    }
}