Skip to main content

jammi_wire/
embedding.rs

1//! `EmbeddingService` proto↔domain conversions.
2//!
3//! Maps the embedding compute wire enums/messages onto the engine's
4//! [`Modality`], [`QueryInput`], and the result-table record. Modality and
5//! input are validated at decode: an unspecified modality and a
6//! text/bytes-vs-modality mismatch are rejected with `invalid_argument`.
7//!
8//! The source-registration and model-introspection conversions
9//! (`SourceType` / `SourceConnection` / `FileFormat` / `SourceDescriptor` /
10//! `Model`) live with the control-plane catalog wire surface
11//! ([`super::catalog`]); only the compute verbs' shapes are here.
12
13use jammi_db::catalog::result_repo::ResultTableRecord;
14use tonic::Status;
15
16use crate::proto::embedding as pb;
17use crate::request::{Modality, QueryInput};
18
19/// Map the proto [`Modality`] onto the engine's [`Modality`]. An unspecified
20/// modality is rejected — a request that names no tower is a client error, not
21/// a silent default.
22impl TryFrom<pb::Modality> for Modality {
23    type Error = Status;
24
25    fn try_from(modality: pb::Modality) -> Result<Self, Self::Error> {
26        match modality {
27            pb::Modality::Text => Ok(Modality::Text),
28            pb::Modality::Image => Ok(Modality::Image),
29            pb::Modality::Audio => Ok(Modality::Audio),
30            pb::Modality::Unspecified => {
31                Err(Status::invalid_argument("modality must be specified"))
32            }
33        }
34    }
35}
36
37/// Decode the raw enum discriminant a request carries. An out-of-range value is
38/// rejected with the same message an `UNSPECIFIED` modality is — the request
39/// names no valid tower either way.
40impl TryFrom<i32> for Modality {
41    type Error = Status;
42
43    fn try_from(modality: i32) -> Result<Self, Self::Error> {
44        match pb::Modality::try_from(modality) {
45            Ok(m) => Modality::try_from(m),
46            Err(_) => Err(Status::invalid_argument("modality must be specified")),
47        }
48    }
49}
50
51/// The proto query oneof paired with its resolved [`Modality`]. The oneof alone
52/// does not say which tower it feeds, so decode takes both: TEXT requires
53/// `text`, IMAGE/AUDIO require `data` (raw bytes); a missing oneof or a mismatch
54/// is a client error.
55pub struct ProtoQueryInput {
56    pub input: Option<pb::encode_query_request::Input>,
57    pub modality: Modality,
58}
59
60impl TryFrom<ProtoQueryInput> for QueryInput {
61    type Error = Status;
62
63    fn try_from(value: ProtoQueryInput) -> Result<Self, Self::Error> {
64        use pb::encode_query_request::Input as ProtoInput;
65        let input = value
66            .input
67            .ok_or_else(|| Status::invalid_argument("input (text or data) is required"))?;
68        match (value.modality, input) {
69            (Modality::Text, ProtoInput::Text(text)) => {
70                if text.is_empty() {
71                    return Err(Status::invalid_argument("text is required"));
72                }
73                Ok(QueryInput::Text(text))
74            }
75            (Modality::Image | Modality::Audio, ProtoInput::Data(data)) => {
76                if data.is_empty() {
77                    return Err(Status::invalid_argument("data is required"));
78                }
79                Ok(QueryInput::Bytes(data))
80            }
81            (Modality::Text, ProtoInput::Data(_)) => Err(Status::invalid_argument(
82                "TEXT modality requires text input, got data",
83            )),
84            (Modality::Image | Modality::Audio, ProtoInput::Text(_)) => Err(
85                Status::invalid_argument("IMAGE/AUDIO modality requires data input, got text"),
86            ),
87        }
88    }
89}
90
91/// Encode the engine's result-table record into the wire `ResultTable`. The
92/// engine's optional `dimensions` is flattened to `0` for a non-embedding /
93/// unset result, `row_count` widens to the wire's `u64`, and `task` rides the
94/// shared [`super::model_task_to_proto`] task vocabulary.
95///
96/// The wire `ResultTable` carries its own `task` (the embedding tower), so the
97/// reconstruction recovers it faithfully from the message itself — never from a
98/// modality threaded in out of band, never a guess.
99impl From<ResultTableRecord> for pb::ResultTable {
100    fn from(record: ResultTableRecord) -> Self {
101        pb::ResultTable {
102            table_name: record.table_name,
103            source_id: record.source_id,
104            model_id: record.model_id,
105            dimensions: record.dimensions.unwrap_or(0),
106            row_count: record.row_count as u64,
107            status: record.status,
108            task: super::model_task_to_proto(record.task) as i32,
109            // A bare record carries no producer cache outcome (a catalog
110            // projection, not a producer return) → `UNSPECIFIED`, the honest
111            // "no producer ran" value. A producer handler uses
112            // [`result_table_with_outcome`] to carry the real outcome.
113            cache_outcome: crate::proto::inference::CacheOutcome::Unspecified as i32,
114        }
115    }
116}
117
118/// Encode a producer's result-table record **with** the cache outcome it
119/// returned, so reuse is observable on the wire — the shape a producer RPC
120/// handler builds, distinct from the bare [`From`] projection (which has no
121/// producer to attribute an outcome to). `outcome` is the wire enum value from
122/// the engine's `CacheOutcome` (`COMPUTED` / `REUSED`).
123pub fn result_table_with_outcome(record: ResultTableRecord, outcome: i32) -> pb::ResultTable {
124    pb::ResultTable {
125        cache_outcome: outcome,
126        ..pb::ResultTable::from(record)
127    }
128}
129
130/// Reconstruct the engine's result-table record from the wire `ResultTable` a
131/// `GenerateEmbeddings` or `DescribeSource` response carries.
132///
133/// The wire message is the client-observable projection: it carries the fields
134/// a client needs to locate and query the persisted embedding table
135/// (`table_name`, `source_id`, `model_id`, `dimensions`, `row_count`, `status`,
136/// `task`). The engine's server-internal bookkeeping — storage/index paths,
137/// timestamps, the originating columns — is intentionally not on the wire, so
138/// the reconstruction leaves those at their "not carried" values (`String::new`
139/// / `None`). A remote consumer keys off the same fields a local one reads back;
140/// the dropped fields are server-side state, not result data. The message is
141/// self-describing in `task`, so an out-of-range/unspecified task is the
142/// faithful `invalid_argument` the shared decoder builds.
143pub fn result_table_from_proto(table: pb::ResultTable) -> Result<ResultTableRecord, Status> {
144    let task = super::model_task_from_proto(table.task)?;
145    Ok(ResultTableRecord {
146        table_name: table.table_name,
147        source_id: table.source_id,
148        model_id: table.model_id,
149        task,
150        // `kind`/`derived_from` are server-internal bookkeeping, not carried on
151        // the wire — `GenerateEmbeddings` only ever returns a model output, so
152        // the reconstruction defaults to that kind.
153        kind: jammi_db::catalog::result_repo::ResultTableKind::Model,
154        derived_from: None,
155        parquet_path: String::new(),
156        index_path: None,
157        dimensions: (table.dimensions != 0).then_some(table.dimensions),
158        distance_metric: String::new(),
159        row_count: table.row_count as usize,
160        status: table.status,
161        key_column: None,
162        text_columns: None,
163        created_at: String::new(),
164        completed_at: None,
165        // Tenant identity is server-side bookkeeping resolved from the session,
166        // not carried on the result wire — the reconstruction leaves it absent.
167        tenant_id: None,
168        // The materialization-contract summary columns are server-side
169        // provenance, not carried on the `GenerateEmbeddings` result wire; a
170        // remote consumer that wants the attestation reaches it through
171        // `verify_materialization`, not this reconstruction.
172        definition_hash: None,
173        input_anchors_json: None,
174    })
175}