oxide-agent 0.1.0

Type-safe, high-performance Rust crate for building agentic systems on Ollama
Documentation
// Mojo Interoperability — offload batch embedding to a Mojo-compiled binary.
//
// Protocol (line-delimited JSON over stdin/stdout):
//
//   stdin  → {"texts": ["hello", "world"]}
//   stdout ← {"embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]]}
//
// Enable with the `mojo-interop` feature flag:
//   oxide-agent = { features = ["mojo-interop"] }

use std::path::{Path, PathBuf};

use crate::error::OxideError;

// ── MojoEmbedder ─────────────────────────────────────────────────────────────

/// Bridge to a Mojo-compiled embedding binary for high-throughput batch
/// embedding.
///
/// The binary must implement the JSON line protocol described at the top of
/// this file.  Spawn it once via [`MojoEmbedder::new`] and reuse it across
/// calls — each [`embed_batch`] call is a single subprocess invocation.
pub struct MojoEmbedder {
    #[allow(dead_code)] // used only under the `mojo-interop` feature
    binary_path: PathBuf,
}

impl MojoEmbedder {
    /// Create a new embedder pointing at the given Mojo binary.
    pub fn new(binary_path: impl AsRef<Path>) -> Self {
        Self {
            binary_path: binary_path.as_ref().to_path_buf(),
        }
    }

    /// Embed a batch of strings.
    ///
    /// Spawns the Mojo binary, feeds it a single-line JSON request on stdin,
    /// and parses the single-line JSON response from stdout.
    ///
    /// # Errors
    /// Returns an error if the binary cannot be spawned, exits non-zero, or
    /// returns malformed JSON.
    pub async fn embed_batch(
        &self,
        texts: Vec<String>,
    ) -> Result<Vec<Vec<f32>>, OxideError> {
        #[cfg(feature = "mojo-interop")]
        {
            self.run_subprocess(texts).await
        }
        #[cfg(not(feature = "mojo-interop"))]
        {
            let _ = texts;
            Err(OxideError::Other(
                "MojoEmbedder requires the `mojo-interop` feature: \
                 oxide-agent = { features = [\"mojo-interop\"] }"
                    .into(),
            ))
        }
    }

    #[cfg(feature = "mojo-interop")]
    async fn run_subprocess(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, OxideError> {
        use tokio::io::{AsyncReadExt, AsyncWriteExt};
        use tokio::process::Command;

        let request = serde_json::json!({"texts": texts});
        let request_line = serde_json::to_string(&request).map_err(OxideError::Serde)?;

        let mut child = Command::new(&self.binary_path)
            .stdin(std::process::Stdio::piped())
            .stdout(std::process::Stdio::piped())
            .stderr(std::process::Stdio::piped())
            .spawn()
            .map_err(|e| OxideError::Other(format!("spawn mojo binary: {e}")))?;

        // Write request to stdin and close it.
        if let Some(mut stdin) = child.stdin.take() {
            stdin
                .write_all(request_line.as_bytes())
                .await
                .map_err(|e| OxideError::Other(format!("write to mojo stdin: {e}")))?;
            stdin
                .write_all(b"\n")
                .await
                .map_err(|e| OxideError::Other(format!("write newline: {e}")))?;
        }

        // Read stdout.
        let mut stdout_buf = String::new();
        if let Some(mut stdout) = child.stdout.take() {
            stdout
                .read_to_string(&mut stdout_buf)
                .await
                .map_err(|e| OxideError::Other(format!("read mojo stdout: {e}")))?;
        }

        let status = child
            .wait()
            .await
            .map_err(|e| OxideError::Other(format!("wait for mojo: {e}")))?;

        if !status.success() {
            return Err(OxideError::Other(format!(
                "mojo binary exited with status {status}"
            )));
        }

        // Parse the first non-empty line as JSON.
        let line = stdout_buf
            .lines()
            .find(|l| !l.trim().is_empty())
            .ok_or_else(|| OxideError::Other("mojo binary produced no output".into()))?;

        #[derive(serde::Deserialize)]
        struct MojoResponse {
            embeddings: Vec<Vec<f32>>,
        }

        let resp: MojoResponse =
            serde_json::from_str(line).map_err(OxideError::Serde)?;

        Ok(resp.embeddings)
    }
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn stub_returns_meaningful_error_without_feature() {
        #[cfg(not(feature = "mojo-interop"))]
        {
            let embedder = MojoEmbedder::new("/usr/local/bin/mojo-embed");
            let err = embedder
                .embed_batch(vec!["hello".into()])
                .await
                .unwrap_err();
            let msg = err.to_string();
            assert!(
                msg.contains("mojo-interop"),
                "error should mention the feature flag"
            );
        }
        #[cfg(feature = "mojo-interop")]
        {}
    }
}