jammi_wire/embedding.rs
1//! `EmbeddingService` proto↔domain conversions.
2//!
3//! Maps the embedding compute wire enums/messages onto the engine's
4//! [`Modality`], [`QueryInput`], and the result-table record. Modality and
5//! input are validated at decode: an unspecified modality and a
6//! text/bytes-vs-modality mismatch are rejected with `invalid_argument`.
7//!
8//! The source-registration and model-introspection conversions
9//! (`SourceType` / `SourceConnection` / `FileFormat` / `SourceDescriptor` /
10//! `Model`) live with the control-plane catalog wire surface
11//! ([`super::catalog`]); only the compute verbs' shapes are here.
12
13use jammi_db::catalog::result_repo::ResultTableRecord;
14use tonic::Status;
15
16use crate::proto::embedding as pb;
17use crate::request::{Modality, QueryInput};
18
19/// Map the proto [`Modality`] onto the engine's [`Modality`]. An unspecified
20/// modality is rejected — a request that names no tower is a client error, not
21/// a silent default.
22impl TryFrom<pb::Modality> for Modality {
23 type Error = Status;
24
25 fn try_from(modality: pb::Modality) -> Result<Self, Self::Error> {
26 match modality {
27 pb::Modality::Text => Ok(Modality::Text),
28 pb::Modality::Image => Ok(Modality::Image),
29 pb::Modality::Audio => Ok(Modality::Audio),
30 pb::Modality::Unspecified => {
31 Err(Status::invalid_argument("modality must be specified"))
32 }
33 }
34 }
35}
36
37/// Decode the raw enum discriminant a request carries. An out-of-range value is
38/// rejected with the same message an `UNSPECIFIED` modality is — the request
39/// names no valid tower either way.
40impl TryFrom<i32> for Modality {
41 type Error = Status;
42
43 fn try_from(modality: i32) -> Result<Self, Self::Error> {
44 match pb::Modality::try_from(modality) {
45 Ok(m) => Modality::try_from(m),
46 Err(_) => Err(Status::invalid_argument("modality must be specified")),
47 }
48 }
49}
50
51/// The proto query oneof paired with its resolved [`Modality`]. The oneof alone
52/// does not say which tower it feeds, so decode takes both: TEXT requires
53/// `text`, IMAGE/AUDIO require `data` (raw bytes); a missing oneof or a mismatch
54/// is a client error.
55pub struct ProtoQueryInput {
56 pub input: Option<pb::encode_query_request::Input>,
57 pub modality: Modality,
58}
59
60impl TryFrom<ProtoQueryInput> for QueryInput {
61 type Error = Status;
62
63 fn try_from(value: ProtoQueryInput) -> Result<Self, Self::Error> {
64 use pb::encode_query_request::Input as ProtoInput;
65 let input = value
66 .input
67 .ok_or_else(|| Status::invalid_argument("input (text or data) is required"))?;
68 match (value.modality, input) {
69 (Modality::Text, ProtoInput::Text(text)) => {
70 if text.is_empty() {
71 return Err(Status::invalid_argument("text is required"));
72 }
73 Ok(QueryInput::Text(text))
74 }
75 (Modality::Image | Modality::Audio, ProtoInput::Data(data)) => {
76 if data.is_empty() {
77 return Err(Status::invalid_argument("data is required"));
78 }
79 Ok(QueryInput::Bytes(data))
80 }
81 (Modality::Text, ProtoInput::Data(_)) => Err(Status::invalid_argument(
82 "TEXT modality requires text input, got data",
83 )),
84 (Modality::Image | Modality::Audio, ProtoInput::Text(_)) => Err(
85 Status::invalid_argument("IMAGE/AUDIO modality requires data input, got text"),
86 ),
87 }
88 }
89}
90
91/// Encode the engine's result-table record into the wire `ResultTable`. The
92/// engine's optional `dimensions` is flattened to `0` for a non-embedding /
93/// unset result, `row_count` widens to the wire's `u64`, and `task` rides the
94/// shared [`super::model_task_to_proto`] task vocabulary.
95///
96/// The wire `ResultTable` carries its own `task` (the embedding tower), so the
97/// reconstruction recovers it faithfully from the message itself — never from a
98/// modality threaded in out of band, never a guess.
99impl From<ResultTableRecord> for pb::ResultTable {
100 fn from(record: ResultTableRecord) -> Self {
101 pb::ResultTable {
102 table_name: record.table_name,
103 source_id: record.source_id,
104 model_id: record.model_id,
105 dimensions: record.dimensions.unwrap_or(0),
106 row_count: record.row_count as u64,
107 status: record.status,
108 task: super::model_task_to_proto(record.task) as i32,
109 // A bare record carries no producer cache outcome (a catalog
110 // projection, not a producer return) → `UNSPECIFIED`, the honest
111 // "no producer ran" value. A producer handler uses
112 // [`result_table_with_outcome`] to carry the real outcome.
113 cache_outcome: crate::proto::inference::CacheOutcome::Unspecified as i32,
114 }
115 }
116}
117
118/// Encode a producer's result-table record **with** the cache outcome it
119/// returned, so reuse is observable on the wire — the shape a producer RPC
120/// handler builds, distinct from the bare [`From`] projection (which has no
121/// producer to attribute an outcome to). `outcome` is the wire enum value from
122/// the engine's `CacheOutcome` (`COMPUTED` / `REUSED`).
123pub fn result_table_with_outcome(record: ResultTableRecord, outcome: i32) -> pb::ResultTable {
124 pb::ResultTable {
125 cache_outcome: outcome,
126 ..pb::ResultTable::from(record)
127 }
128}
129
130/// Reconstruct the engine's result-table record from the wire `ResultTable` a
131/// `GenerateEmbeddings` or `DescribeSource` response carries.
132///
133/// The wire message is the client-observable projection: it carries the fields
134/// a client needs to locate and query the persisted embedding table
135/// (`table_name`, `source_id`, `model_id`, `dimensions`, `row_count`, `status`,
136/// `task`). The engine's server-internal bookkeeping — storage/index paths,
137/// timestamps, the originating columns — is intentionally not on the wire, so
138/// the reconstruction leaves those at their "not carried" values (`String::new`
139/// / `None`). A remote consumer keys off the same fields a local one reads back;
140/// the dropped fields are server-side state, not result data. The message is
141/// self-describing in `task`, so an out-of-range/unspecified task is the
142/// faithful `invalid_argument` the shared decoder builds.
143pub fn result_table_from_proto(table: pb::ResultTable) -> Result<ResultTableRecord, Status> {
144 let task = super::model_task_from_proto(table.task)?;
145 Ok(ResultTableRecord {
146 table_name: table.table_name,
147 source_id: table.source_id,
148 model_id: table.model_id,
149 task,
150 // `kind`/`derived_from` are server-internal bookkeeping, not carried on
151 // the wire — `GenerateEmbeddings` only ever returns a model output, so
152 // the reconstruction defaults to that kind.
153 kind: jammi_db::catalog::result_repo::ResultTableKind::Model,
154 derived_from: None,
155 parquet_path: String::new(),
156 index_path: None,
157 dimensions: (table.dimensions != 0).then_some(table.dimensions),
158 distance_metric: String::new(),
159 row_count: table.row_count as usize,
160 status: table.status,
161 key_column: None,
162 text_columns: None,
163 created_at: String::new(),
164 completed_at: None,
165 // Tenant identity is server-side bookkeeping resolved from the session,
166 // not carried on the result wire — the reconstruction leaves it absent.
167 tenant_id: None,
168 // The materialization-contract summary columns are server-side
169 // provenance, not carried on the `GenerateEmbeddings` result wire; a
170 // remote consumer that wants the attestation reaches it through
171 // `verify_materialization`, not this reconstruction.
172 definition_hash: None,
173 input_anchors_json: None,
174 })
175}