jammi-wire 0.26.3

The Jammi gRPC wire substrate: generated jammi.v1 tonic stubs, proto↔domain conversions, and the shared session transport
Documentation
//! The Jammi gRPC wire substrate.
//!
//! `jammi-wire` is the candle-free home for everything both sides of the
//! `jammi.v1` wire share: the generated tonic stubs ([`proto`]), the
//! proto↔domain conversions, the request / eval / fine-tune vocabulary the
//! conversions map, the Arrow-IPC framing helpers, and the [`SessionTransport`]
//! the typed clients build their per-service stubs over.
//!
//! Both sides of the wire live here so the conversions compile once and are used
//! in both directions: the prost types are local to this crate (generated by
//! `build.rs`) and the domain types are local too (the moved vocabulary and the
//! `jammi-db` substrate), so `From`/`TryFrom` impls satisfy the orphan rule
//! without a newtype wrapper. `jammi-server` consumes the server stubs + these
//! conversions; `jammi-admin` / `jammi-client` consume the client stubs + the
//! same conversions — neither reimplements a mapping.
//!
//! Arrow record batches cross the wire through the IPC-stream helpers
//! ([`encode_ipc_stream`] / [`decode_ipc_stream`]), which carry a self-describing
//! IPC stream (schema + batches) in one `ArrowBatch.data_body` — the Flight-IPC
//! pairing the trigger, inference, and mutable-table surfaces share.

use arrow::record_batch::RecordBatch;
use arrow_ipc::reader::StreamReader;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::SchemaRef;
use tonic::Status;

pub mod proto;

pub mod eval;
pub mod fine_tune;
pub mod request;
mod transport;

mod audit;
mod catalog;
mod channel;
mod embedding;
mod error;
mod eval_wire;
mod inference;
mod mutable_table;
mod training;
mod trigger;

pub use transport::{SessionChannel, SessionHeader, SessionTransport, SESSION_HEADER};

pub use audit::{parse_query_id, record_from_wire};
pub use catalog::{
    model_from_proto, model_to_proto, source_descriptor_from_proto, source_type_from_proto,
    source_type_to_proto, topic_from_proto, topic_to_proto,
};
pub use channel::{
    channel_from_proto, channel_to_proto, columns_from_proto, columns_to_proto, parse_channel_id,
};
pub use embedding::{result_table_from_proto, ProtoQueryInput};
pub use error::{
    attach_audit_detail, attach_error_detail, attach_trigger_detail, audit_error_from_status,
    error_from_status, trigger_error_from_status,
};
pub use eval_wire::{
    calibration_shape_from_proto, calibration_shape_to_proto, cohorts_from_proto, cohorts_to_proto,
    eval_task_to_proto, EvalTaskFromWire,
};
pub use inference::infer_result_to_proto;
pub use mutable_table::{
    definition_from_proto, definition_list_from_proto, definition_to_proto, parse_table_id,
};
pub use training::{config_to_proto, method_from_proto, method_to_proto};
pub use trigger::{
    decode_publish_batch, decode_subscribed_batch, encode_delivered_batch, encode_publish_batch,
    from_proto_timestamp, to_proto_timestamp,
};

/// Map the wire [`proto::inference::ModelTask`] onto the substrate's
/// [`jammi_db::ModelTask`]. An unspecified task is rejected — a request that
/// names no task is a client error, not a silent default. Shared by the
/// inference and fine-tune surfaces, which both carry `jammi.v1.inference
/// .ModelTask`.
pub fn model_task_from_proto(task: i32) -> Result<jammi_db::ModelTask, Status> {
    use jammi_db::ModelTask;
    use proto::inference::ModelTask as ProtoModelTask;
    match ProtoModelTask::try_from(task) {
        Ok(ProtoModelTask::TextEmbedding) => Ok(ModelTask::TextEmbedding),
        Ok(ProtoModelTask::ImageEmbedding) => Ok(ModelTask::ImageEmbedding),
        Ok(ProtoModelTask::AudioEmbedding) => Ok(ModelTask::AudioEmbedding),
        Ok(ProtoModelTask::Classification) => Ok(ModelTask::Classification),
        Ok(ProtoModelTask::Ner) => Ok(ModelTask::Ner),
        Ok(ProtoModelTask::Regression) => Ok(ModelTask::Regression),
        Ok(ProtoModelTask::Unspecified) | Err(_) => {
            Err(Status::invalid_argument("task must be specified"))
        }
    }
}

/// Encode the substrate's [`jammi_db::ModelTask`] onto the wire enum — the
/// inverse of [`model_task_from_proto`], for the client send side. Total: every
/// task maps to a concrete wire variant (the type has no unspecified state).
/// Shared by the inference and fine-tune send surfaces, which both carry
/// `jammi.v1.inference.ModelTask`.
pub fn model_task_to_proto(task: jammi_db::ModelTask) -> proto::inference::ModelTask {
    use jammi_db::ModelTask;
    use proto::inference::ModelTask as ProtoModelTask;
    match task {
        ModelTask::TextEmbedding => ProtoModelTask::TextEmbedding,
        ModelTask::ImageEmbedding => ProtoModelTask::ImageEmbedding,
        ModelTask::AudioEmbedding => ProtoModelTask::AudioEmbedding,
        ModelTask::Classification => ProtoModelTask::Classification,
        ModelTask::Ner => ProtoModelTask::Ner,
        ModelTask::Regression => ProtoModelTask::Regression,
    }
}

/// Encode a sequence of record batches into one self-describing Arrow IPC
/// stream (schema message followed by each batch). The result is the
/// `data_body` of an `ArrowBatch`; `data_header` stays empty because the
/// stream already carries its schema. [`decode_ipc_stream`] is the inverse.
///
/// An empty batch slice still encodes a valid stream carrying just `schema`,
/// so a zero-row inference result round-trips to an empty `Vec<RecordBatch>`
/// rather than an error.
pub fn encode_ipc_stream(schema: &SchemaRef, batches: &[RecordBatch]) -> Result<Vec<u8>, Status> {
    let mut buf: Vec<u8> = Vec::new();
    {
        let mut writer = StreamWriter::try_new(&mut buf, schema.as_ref())
            .map_err(|e| Status::internal(format!("batch encode: {e}")))?;
        for batch in batches {
            writer
                .write(batch)
                .map_err(|e| Status::internal(format!("batch encode: {e}")))?;
        }
        writer
            .finish()
            .map_err(|e| Status::internal(format!("batch encode: {e}")))?;
    }
    Ok(buf)
}

/// Decode an Arrow IPC stream's schema message into a [`SchemaRef`]. The bytes
/// are a self-describing IPC stream (a `schema` message, optionally followed by
/// batches) — the same framing [`encode_ipc_stream`] produces; a schema-only
/// payload is `encode_ipc_stream(schema, &[])`. Used by the verbs that carry a
/// table/topic schema declaration on the wire rather than a batch of rows.
pub fn decode_ipc_schema(bytes: &[u8]) -> Result<SchemaRef, Status> {
    if bytes.is_empty() {
        return Err(Status::invalid_argument("schema is required"));
    }
    let cursor = std::io::Cursor::new(bytes.to_vec());
    let reader = StreamReader::try_new(cursor, None)
        .map_err(|e| Status::invalid_argument(format!("schema decode: {e}")))?;
    Ok(reader.schema())
}

/// Decode the `data_header` + `data_body` of an `ArrowBatch` into the record
/// batches it carries. Concatenates the two byte runs (the trigger pairing puts
/// the schema header inline in `data_body` and leaves `data_header` empty, but a
/// producer may split them) and reads every batch from the resulting IPC stream.
/// Inverse of [`encode_ipc_stream`].
pub fn decode_ipc_stream(data_header: &[u8], data_body: &[u8]) -> Result<Vec<RecordBatch>, Status> {
    // An all-empty payload carries no schema and no batches — the encoder's
    // representation of "zero rows, schema unknown". Decode it to an empty
    // batch list rather than feeding `StreamReader` a truncated stream.
    if data_header.is_empty() && data_body.is_empty() {
        return Ok(Vec::new());
    }
    let mut bytes = Vec::with_capacity(data_header.len() + data_body.len());
    bytes.extend_from_slice(data_header);
    bytes.extend_from_slice(data_body);
    let cursor = std::io::Cursor::new(bytes);
    let reader = StreamReader::try_new(cursor, None)
        .map_err(|e| Status::invalid_argument(format!("batch decode: {e}")))?;
    reader
        .collect::<Result<Vec<_>, _>>()
        .map_err(|e| Status::invalid_argument(format!("batch decode: {e}")))
}