use jammi_db::catalog::backend::BackendError;
use jammi_db::catalog::channel_repo::{ChannelCatalogError, ChannelColumnType};
use jammi_db::error::JammiError;
use jammi_db::store::mutable::{MutableTableError, MutableTableId};
use jammi_db::trigger::TriggerError;
use jammi_db::{AuditError, TenantId};
use prost::bytes::Bytes;
use prost::{Message, Name};
use prost_types::Any;
use tonic::{Code, Status};
use uuid::Uuid;
use crate::proto::error as pb;
impl From<&JammiError> for pb::JammiErrorDetail {
fn from(err: &JammiError) -> Self {
use pb::jammi_error_detail::Variant;
let variant = match err {
JammiError::Source { source_id, message } => Variant::Source(pb::SourceError {
source_id: source_id.clone(),
message: message.clone(),
}),
JammiError::Model { model_id, message } => Variant::Model(pb::ModelError {
model_id: model_id.clone(),
message: message.clone(),
}),
JammiError::ModelNotFound { model_id } => {
Variant::ModelNotFound(pb::ModelNotFoundError {
model_id: model_id.clone(),
})
}
JammiError::ModelReferenced {
model_id,
referenced_by,
} => Variant::ModelReferenced(pb::ModelReferencedError {
model_id: model_id.clone(),
referenced_by: referenced_by.clone(),
}),
JammiError::Inference(message) => Variant::Inference(pb::StringError {
message: message.clone(),
}),
JammiError::Catalog(message) => Variant::Catalog(pb::StringError {
message: message.clone(),
}),
JammiError::Schema {
table,
column,
expected,
actual,
} => Variant::Schema(pb::SchemaError {
table: table.clone(),
column: column.clone(),
expected: expected.clone(),
actual: actual.clone(),
}),
JammiError::Config(message) => Variant::Config(pb::StringError {
message: message.clone(),
}),
JammiError::Eval(message) => Variant::Eval(pb::StringError {
message: message.clone(),
}),
JammiError::Tenant(message) => Variant::Tenant(pb::StringError {
message: message.clone(),
}),
JammiError::FineTune(message) => Variant::FineTune(pb::StringError {
message: message.clone(),
}),
JammiError::Gpu(message) => Variant::Gpu(pb::StringError {
message: message.clone(),
}),
JammiError::Backend(message) => Variant::Backend(pb::StringError {
message: message.clone(),
}),
JammiError::ChannelCatalog(e) => Variant::ChannelCatalog(e.into()),
JammiError::ChannelAssembly(message) => Variant::ChannelAssembly(pb::StringError {
message: message.clone(),
}),
JammiError::MutableTable(e) => Variant::MutableTable(e.into()),
other => Variant::Other(pb::StringError {
message: other.to_string(),
}),
};
pb::JammiErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::JammiErrorDetail> for JammiError {
fn from(detail: pb::JammiErrorDetail) -> Self {
use pb::jammi_error_detail::Variant;
match detail.variant {
Some(Variant::Source(e)) => JammiError::Source {
source_id: e.source_id,
message: e.message,
},
Some(Variant::Model(e)) => JammiError::Model {
model_id: e.model_id,
message: e.message,
},
Some(Variant::ModelNotFound(e)) => JammiError::ModelNotFound {
model_id: e.model_id,
},
Some(Variant::ModelReferenced(e)) => JammiError::ModelReferenced {
model_id: e.model_id,
referenced_by: e.referenced_by,
},
Some(Variant::Inference(e)) => JammiError::Inference(e.message),
Some(Variant::Catalog(e)) => JammiError::Catalog(e.message),
Some(Variant::Schema(e)) => JammiError::Schema {
table: e.table,
column: e.column,
expected: e.expected,
actual: e.actual,
},
Some(Variant::Config(e)) => JammiError::Config(e.message),
Some(Variant::Eval(e)) => JammiError::Eval(e.message),
Some(Variant::Tenant(e)) => JammiError::Tenant(e.message),
Some(Variant::FineTune(e)) => JammiError::FineTune(e.message),
Some(Variant::Gpu(e)) => JammiError::Gpu(e.message),
Some(Variant::Backend(e)) => JammiError::Backend(e.message),
Some(Variant::ChannelCatalog(e)) => JammiError::ChannelCatalog(e.into()),
Some(Variant::ChannelAssembly(e)) => JammiError::ChannelAssembly(e.message),
Some(Variant::MutableTable(e)) => JammiError::MutableTable(e.into()),
Some(Variant::Other(e)) => JammiError::Other(e.message),
None => JammiError::Other(String::new()),
}
}
}
impl From<&MutableTableError> for pb::MutableTableErrorDetail {
fn from(err: &MutableTableError) -> Self {
use pb::mutable_table_error_detail::Variant;
let variant = match err {
MutableTableError::InvalidId(m) => Variant::InvalidId(m.clone()),
MutableTableError::Schema(m) => Variant::Schema(m.clone()),
MutableTableError::MissingPrimaryKey(m) => Variant::MissingPrimaryKey(m.clone()),
MutableTableError::ReservedColumn(m) => Variant::ReservedColumn(m.clone()),
MutableTableError::NotFound(id) => Variant::NotFound(id.to_string()),
MutableTableError::AlreadyExists(id) => Variant::AlreadyExists(id.to_string()),
MutableTableError::NoOrderColumn => Variant::NoOrderColumn(true),
MutableTableError::Backend(e) => Variant::Backend(e.into()),
};
pb::MutableTableErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::MutableTableErrorDetail> for MutableTableError {
fn from(detail: pb::MutableTableErrorDetail) -> Self {
use pb::mutable_table_error_detail::Variant;
let reconstruct_id = |s: String, wrap: fn(MutableTableId) -> MutableTableError| {
match MutableTableId::new(&s) {
Ok(id) => wrap(id),
Err(_) => MutableTableError::InvalidId(s),
}
};
match detail.variant {
Some(Variant::InvalidId(m)) => MutableTableError::InvalidId(m),
Some(Variant::Schema(m)) => MutableTableError::Schema(m),
Some(Variant::MissingPrimaryKey(m)) => MutableTableError::MissingPrimaryKey(m),
Some(Variant::ReservedColumn(m)) => MutableTableError::ReservedColumn(m),
Some(Variant::NotFound(s)) => reconstruct_id(s, MutableTableError::NotFound),
Some(Variant::AlreadyExists(s)) => reconstruct_id(s, MutableTableError::AlreadyExists),
Some(Variant::NoOrderColumn(_)) => MutableTableError::NoOrderColumn,
Some(Variant::Backend(e)) => MutableTableError::Backend(e.into()),
None => MutableTableError::Schema(String::new()),
}
}
}
impl From<&ChannelCatalogError> for pb::ChannelCatalogErrorDetail {
fn from(err: &ChannelCatalogError) -> Self {
use pb::channel_catalog_error_detail::Variant;
let variant = match err {
ChannelCatalogError::AlreadyExists(c) => Variant::AlreadyExists(c.clone()),
ChannelCatalogError::NotRegistered(c) => Variant::NotRegistered(c.clone()),
ChannelCatalogError::ColumnAlreadyDeclared {
channel,
column,
ty,
} => Variant::ColumnAlreadyDeclared(pb::ColumnAlreadyDeclared {
channel: channel.clone(),
column: column.clone(),
ty: ty.as_str().to_string(),
}),
ChannelCatalogError::ColumnConflict {
channel,
column,
existing,
requested,
} => Variant::ColumnConflict(pb::ColumnConflict {
channel: channel.clone(),
column: column.clone(),
existing: existing.as_str().to_string(),
requested: requested.as_str().to_string(),
}),
ChannelCatalogError::InvalidId(m) => Variant::InvalidId(m.clone()),
ChannelCatalogError::InvalidColumnType(m) => Variant::InvalidColumnType(m.clone()),
};
pb::ChannelCatalogErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::ChannelCatalogErrorDetail> for ChannelCatalogError {
fn from(detail: pb::ChannelCatalogErrorDetail) -> Self {
use pb::channel_catalog_error_detail::Variant;
let parse_ty = |token: String| ChannelColumnType::from_sql_str(&token).map_err(|_| token);
match detail.variant {
Some(Variant::AlreadyExists(c)) => ChannelCatalogError::AlreadyExists(c),
Some(Variant::NotRegistered(c)) => ChannelCatalogError::NotRegistered(c),
Some(Variant::ColumnAlreadyDeclared(d)) => match parse_ty(d.ty) {
Ok(ty) => ChannelCatalogError::ColumnAlreadyDeclared {
channel: d.channel,
column: d.column,
ty,
},
Err(token) => ChannelCatalogError::InvalidColumnType(token),
},
Some(Variant::ColumnConflict(d)) => match (parse_ty(d.existing), parse_ty(d.requested))
{
(Ok(existing), Ok(requested)) => ChannelCatalogError::ColumnConflict {
channel: d.channel,
column: d.column,
existing,
requested,
},
(Err(token), _) | (_, Err(token)) => ChannelCatalogError::InvalidColumnType(token),
},
Some(Variant::InvalidId(m)) => ChannelCatalogError::InvalidId(m),
Some(Variant::InvalidColumnType(m)) => ChannelCatalogError::InvalidColumnType(m),
None => ChannelCatalogError::NotRegistered(String::new()),
}
}
}
impl From<&BackendError> for pb::BackendErrorDetail {
fn from(err: &BackendError) -> Self {
use pb::backend_error_detail::Variant;
let variant = match err {
BackendError::Execution(m) => Variant::Execution(m.clone()),
BackendError::Constraint { table, detail } => {
Variant::Constraint(pb::ConstraintViolation {
table: table.clone(),
detail: detail.clone(),
})
}
BackendError::Unavailable(m) => Variant::Unavailable(m.clone()),
BackendError::Retry(m) => Variant::Retry(m.clone()),
BackendError::Migration(m) => Variant::Migration(m.clone()),
BackendError::TypeConversion { column, detail } => {
Variant::TypeConversion(pb::TypeConversion {
column: column.clone(),
detail: detail.clone(),
})
}
BackendError::TenantMismatch {
table,
expected,
got,
} => Variant::TenantMismatch(pb::TenantMismatch {
table: table.clone(),
expected: expected.map(|t| t.to_string()).unwrap_or_default(),
got: got.map(|t| t.to_string()).unwrap_or_default(),
}),
BackendError::Sqlx(e) => Variant::Sqlx(e.to_string()),
};
pb::BackendErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::BackendErrorDetail> for BackendError {
fn from(detail: pb::BackendErrorDetail) -> Self {
use pb::backend_error_detail::Variant;
let parse_tenant = |s: String| -> Option<TenantId> {
if s.is_empty() {
None
} else {
s.parse().ok()
}
};
match detail.variant {
Some(Variant::Execution(m)) => BackendError::Execution(m),
Some(Variant::Constraint(c)) => BackendError::Constraint {
table: c.table,
detail: c.detail,
},
Some(Variant::Unavailable(m)) => BackendError::Unavailable(m),
Some(Variant::Retry(m)) => BackendError::Retry(m),
Some(Variant::Migration(m)) => BackendError::Migration(m),
Some(Variant::TypeConversion(t)) => BackendError::TypeConversion {
column: t.column,
detail: t.detail,
},
Some(Variant::TenantMismatch(t)) => BackendError::TenantMismatch {
table: t.table,
expected: parse_tenant(t.expected),
got: parse_tenant(t.got),
},
Some(Variant::Sqlx(m)) => BackendError::Execution(m),
None => BackendError::Execution(String::new()),
}
}
}
fn tenant_to_wire(t: Option<TenantId>) -> String {
t.map(|t| t.to_string()).unwrap_or_default()
}
fn parse_optional_tenant(s: String) -> Option<TenantId> {
if s.is_empty() {
None
} else {
s.parse().ok()
}
}
impl From<&TriggerError> for pb::TriggerErrorDetail {
fn from(err: &TriggerError) -> Self {
use pb::trigger_error_detail::Variant;
let variant = match err {
TriggerError::TopicNotFound(m) => Variant::TopicNotFound(m.clone()),
TriggerError::SchemaConflict { topic, detail } => {
Variant::SchemaConflict(pb::SchemaConflict {
topic: topic.clone(),
detail: detail.clone(),
})
}
TriggerError::UnsupportedSchemaType { column, data_type } => {
Variant::UnsupportedSchemaType(pb::UnsupportedSchemaType {
column: column.clone(),
data_type: data_type.clone(),
})
}
TriggerError::BatchSchemaMismatch(m) => Variant::BatchSchemaMismatch(m.clone()),
TriggerError::PublishTenantMismatch {
topic,
topic_tenant,
publish_tenant,
} => Variant::PublishTenantMismatch(pb::PublishTenantMismatch {
topic: topic.clone(),
topic_tenant: tenant_to_wire(*topic_tenant),
publish_tenant: tenant_to_wire(*publish_tenant),
}),
TriggerError::PredicateParse(m) => Variant::PredicateParse(m.clone()),
TriggerError::PredicateEval(m) => Variant::PredicateEval(m.clone()),
TriggerError::PredicateUnsupported(m) => Variant::PredicateUnsupported(m.clone()),
TriggerError::OffsetEvicted(n) => Variant::OffsetEvicted(*n),
TriggerError::BackingTable(e) => Variant::BackingTable(e.into()),
TriggerError::Backend(e) => Variant::Backend(e.into()),
TriggerError::Driver(m) => Variant::Driver(m.clone()),
TriggerError::Catalog(m) => Variant::Catalog(m.clone()),
};
pb::TriggerErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::TriggerErrorDetail> for TriggerError {
fn from(detail: pb::TriggerErrorDetail) -> Self {
use pb::trigger_error_detail::Variant;
match detail.variant {
Some(Variant::TopicNotFound(m)) => TriggerError::TopicNotFound(m),
Some(Variant::SchemaConflict(c)) => TriggerError::SchemaConflict {
topic: c.topic,
detail: c.detail,
},
Some(Variant::UnsupportedSchemaType(u)) => TriggerError::UnsupportedSchemaType {
column: u.column,
data_type: u.data_type,
},
Some(Variant::BatchSchemaMismatch(m)) => TriggerError::BatchSchemaMismatch(m),
Some(Variant::PublishTenantMismatch(p)) => TriggerError::PublishTenantMismatch {
topic: p.topic,
topic_tenant: parse_optional_tenant(p.topic_tenant),
publish_tenant: parse_optional_tenant(p.publish_tenant),
},
Some(Variant::PredicateParse(m)) => TriggerError::PredicateParse(m),
Some(Variant::PredicateEval(m)) => TriggerError::PredicateEval(m),
Some(Variant::PredicateUnsupported(m)) => TriggerError::PredicateUnsupported(m),
Some(Variant::OffsetEvicted(n)) => TriggerError::OffsetEvicted(n),
Some(Variant::BackingTable(e)) => TriggerError::BackingTable(e.into()),
Some(Variant::Backend(e)) => TriggerError::Backend(e.into()),
Some(Variant::Driver(m)) => TriggerError::Driver(m),
Some(Variant::Catalog(m)) => TriggerError::Catalog(m),
None => TriggerError::Catalog(String::new()),
}
}
}
impl From<&AuditError> for pb::AuditErrorDetail {
fn from(err: &AuditError) -> Self {
use pb::audit_error_detail::Variant;
let variant = match err {
AuditError::LengthMismatch { ids, scores } => {
Variant::LengthMismatch(pb::LengthMismatch {
ids: *ids as u64,
scores: *scores as u64,
})
}
AuditError::LineageTooLarge { actual, max } => {
Variant::LineageTooLarge(pb::LineageTooLarge {
actual: *actual as u64,
max: *max as u64,
})
}
AuditError::NoTenantBinding => Variant::NoTenantBinding(true),
AuditError::SignatureMismatch(id) => Variant::SignatureMismatch(id.to_string()),
AuditError::MasterKey(m) => Variant::MasterKey(m.clone()),
AuditError::Serde(e) => Variant::Serde(e.to_string()),
AuditError::Storage(m) => Variant::Storage(m.clone()),
AuditError::Broker(m) => Variant::Broker(m.clone()),
};
pb::AuditErrorDetail {
variant: Some(variant),
}
}
}
impl From<pb::AuditErrorDetail> for AuditError {
fn from(detail: pb::AuditErrorDetail) -> Self {
use pb::audit_error_detail::Variant;
match detail.variant {
Some(Variant::LengthMismatch(l)) => AuditError::LengthMismatch {
ids: l.ids as usize,
scores: l.scores as usize,
},
Some(Variant::LineageTooLarge(l)) => AuditError::LineageTooLarge {
actual: l.actual as usize,
max: l.max as usize,
},
Some(Variant::NoTenantBinding(_)) => AuditError::NoTenantBinding,
Some(Variant::SignatureMismatch(id)) => {
AuditError::SignatureMismatch(Uuid::parse_str(&id).unwrap_or_default())
}
Some(Variant::MasterKey(m)) => AuditError::MasterKey(m),
Some(Variant::Serde(m)) => AuditError::Storage(m),
Some(Variant::Storage(m)) => AuditError::Storage(m),
Some(Variant::Broker(m)) => AuditError::Broker(m),
None => AuditError::Storage(String::new()),
}
}
}
fn attach_detail<M: Message + Name>(code: Code, message: String, detail: &M) -> Status {
let details = Any::from_msg(detail)
.map(|any| vec![any])
.unwrap_or_default();
let envelope = pb::RpcStatus {
code: code as i32,
message: message.clone(),
details,
};
Status::with_details(code, message, Bytes::from(envelope.encode_to_vec()))
}
fn extract_detail<M: Message + Name + Default>(status: &Status) -> Option<M> {
let details = status.details();
if details.is_empty() {
return None;
}
let envelope = pb::RpcStatus::decode(details).ok()?;
let type_url = M::type_url();
envelope
.details
.iter()
.find(|any| any.type_url == type_url)
.and_then(|any| any.to_msg::<M>().ok())
}
pub fn attach_error_detail(code: Code, message: String, err: &JammiError) -> Status {
attach_detail(code, message, &pb::JammiErrorDetail::from(err))
}
pub fn error_from_status(status: &Status) -> JammiError {
match extract_detail::<pb::JammiErrorDetail>(status) {
Some(detail) => JammiError::from(detail),
None => JammiError::Other(status.message().to_string()),
}
}
pub fn attach_trigger_detail(code: Code, message: String, err: &TriggerError) -> Status {
attach_detail(code, message, &pb::TriggerErrorDetail::from(err))
}
pub fn trigger_error_from_status(status: &Status) -> TriggerError {
match extract_detail::<pb::TriggerErrorDetail>(status) {
Some(detail) => TriggerError::from(detail),
None => TriggerError::Driver(status.message().to_string()),
}
}
pub fn attach_audit_detail(code: Code, message: String, err: &AuditError) -> Status {
attach_detail(code, message, &pb::AuditErrorDetail::from(err))
}
pub fn audit_error_from_status(status: &Status) -> AuditError {
match extract_detail::<pb::AuditErrorDetail>(status) {
Some(detail) => AuditError::from(detail),
None => AuditError::Storage(status.message().to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn round_trip(err: &JammiError) -> JammiError {
let status = attach_error_detail(Code::Internal, err.to_string(), err);
error_from_status(&status)
}
#[test]
fn every_owned_shape_variant_round_trips_to_itself() {
let owned = [
JammiError::Config("missing api key".into()),
JammiError::Catalog("no embedding table for source".into()),
JammiError::Source {
source_id: "patents".into(),
message: "scan failed".into(),
},
JammiError::Model {
model_id: "local:/models/tiny_bert".into(),
message: "Model directory does not exist".into(),
},
JammiError::ModelNotFound {
model_id: "local:/models/tiny_bert".into(),
},
JammiError::ModelReferenced {
model_id: "local:/models/tiny_bert".into(),
referenced_by: vec!["result_tables".into(), "training_jobs.base_model_id".into()],
},
JammiError::Inference("encode_query forward: shape mismatch".into()),
JammiError::FineTune("checkpoint epoch 3 diverged".into()),
JammiError::Eval("golden NER fixture row 7 mismatch".into()),
JammiError::Gpu("no CUDA device visible".into()),
JammiError::Backend("vLLM returned HTTP 503".into()),
JammiError::Tenant("nil UUID is not a valid tenant".into()),
JammiError::ChannelAssembly(
"batch 0: channel 'vector' column 'similarity' has dtype Int32".into(),
),
JammiError::Schema {
table: "patents_embeddings".into(),
column: "vector".into(),
expected: "FixedSizeList<Float32>".into(),
actual: "missing".into(),
},
JammiError::Other("an error with no more specific shape".into()),
];
for err in &owned {
let back = round_trip(err);
assert_eq!(
std::mem::discriminant(&back),
std::mem::discriminant(err),
"owned-shape variant must reconstruct as itself: {err:?} -> {back:?}"
);
assert_eq!(
back.to_string(),
err.to_string(),
"owned-shape variant must reconstruct its fields faithfully: {err:?} -> {back:?}"
);
}
}
#[test]
fn foreign_source_variant_folds_to_other_with_faithful_display() {
let io = JammiError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"model.safetensors not found",
));
let display = io.to_string();
match round_trip(&io) {
JammiError::Other(message) => assert_eq!(
message, display,
"the foreign-source fold carries the faithful Display string"
),
other => panic!("a foreign-source variant must fold to Other, got {other:?}"),
}
}
#[test]
fn mutable_table_variant_round_trips_faithfully() {
use jammi_db::catalog::backend::BackendError;
use jammi_db::store::mutable::{MutableTableError, MutableTableId};
let table_id = MutableTableId::new("patents_dim").expect("valid id");
let cases = [
MutableTableError::InvalidId(
"table name '_jammi_audit' is reserved for the Jammi substrate".into(),
),
MutableTableError::Schema("order_column 'seq' not in schema".into()),
MutableTableError::MissingPrimaryKey("row_key".into()),
MutableTableError::ReservedColumn("tenant_id".into()),
MutableTableError::NotFound(table_id.clone()),
MutableTableError::AlreadyExists(table_id.clone()),
MutableTableError::NoOrderColumn,
MutableTableError::Backend(BackendError::Constraint {
table: "patents_dim".into(),
detail: "duplicate key value violates unique constraint".into(),
}),
MutableTableError::Backend(BackendError::TenantMismatch {
table: "patents_dim".into(),
expected: Some(
"01906c83-d4c8-7e10-9c4f-3b6f7c5a8e9a"
.parse()
.expect("uuid"),
),
got: None,
}),
];
for inner in cases {
let err = JammiError::MutableTable(inner);
let back = round_trip(&err);
match (&err, &back) {
(JammiError::MutableTable(_), JammiError::MutableTable(_)) => {}
other => panic!(
"MutableTable must reconstruct as itself, never fold to Other: {other:?}"
),
}
assert_eq!(
back.to_string(),
err.to_string(),
"MutableTable variant must reconstruct its fields faithfully: {err:?} -> {back:?}"
);
}
}
#[test]
fn channel_catalog_variant_round_trips_faithfully() {
let cases = [
ChannelCatalogError::AlreadyExists("scored_by".into()),
ChannelCatalogError::NotRegistered("vector".into()),
ChannelCatalogError::ColumnAlreadyDeclared {
channel: "scored_by".into(),
column: "ranker".into(),
ty: ChannelColumnType::Utf8,
},
ChannelCatalogError::ColumnConflict {
channel: "scored_by".into(),
column: "ranker".into(),
existing: ChannelColumnType::Utf8,
requested: ChannelColumnType::Int32,
},
ChannelCatalogError::InvalidId("invalid channel id 'Bad': must be [a-z0-9_]".into()),
ChannelCatalogError::InvalidColumnType("Decimal".into()),
];
for inner in cases {
let err = JammiError::ChannelCatalog(inner);
let back = round_trip(&err);
match (&err, &back) {
(JammiError::ChannelCatalog(_), JammiError::ChannelCatalog(_)) => {}
other => panic!(
"ChannelCatalog must reconstruct as itself, never fold to Other: {other:?}"
),
}
assert_eq!(
back.to_string(),
err.to_string(),
"ChannelCatalog variant must reconstruct its fields faithfully: {err:?} -> {back:?}"
);
}
let conflict = JammiError::ChannelCatalog(ChannelCatalogError::ColumnConflict {
channel: "scored_by".into(),
column: "ranker".into(),
existing: ChannelColumnType::Utf8,
requested: ChannelColumnType::Int32,
});
assert!(round_trip(&conflict)
.to_string()
.contains("cannot redeclare as Int32"));
}
fn round_trip_trigger(err: &TriggerError) -> TriggerError {
let status = attach_trigger_detail(Code::Internal, err.to_string(), err);
trigger_error_from_status(&status)
}
#[test]
fn every_trigger_variant_round_trips_to_itself() {
let tenant_a: TenantId = "01906c83-d4c8-7e10-9c4f-3b6f7c5a8e9a"
.parse()
.expect("uuid");
let cases = [
TriggerError::TopicNotFound("events.changes".into()),
TriggerError::SchemaConflict {
topic: "events.changes".into(),
detail: "column 'kind' type changed Utf8 -> Int64".into(),
},
TriggerError::UnsupportedSchemaType {
column: "payload".into(),
data_type: "Struct".into(),
},
TriggerError::BatchSchemaMismatch("publish has 3 columns, topic has 2".into()),
TriggerError::PublishTenantMismatch {
topic: "events.changes".into(),
topic_tenant: Some(tenant_a),
publish_tenant: None,
},
TriggerError::PredicateParse("unexpected token at column 4".into()),
TriggerError::PredicateEval("predicate did not produce Boolean array".into()),
TriggerError::PredicateUnsupported("aggregate functions are not allowed".into()),
TriggerError::OffsetEvicted(42),
TriggerError::BackingTable(MutableTableError::AlreadyExists(
MutableTableId::new("__topic_abc").expect("valid id"),
)),
TriggerError::Backend(BackendError::Constraint {
table: "topics".into(),
detail: "duplicate key value violates unique constraint".into(),
}),
TriggerError::Driver("nats: connection closed".into()),
TriggerError::Catalog("topic_id parse: invalid".into()),
];
for err in &cases {
let back = round_trip_trigger(err);
assert_eq!(
std::mem::discriminant(&back),
std::mem::discriminant(err),
"TriggerError variant must reconstruct as itself: {err:?} -> {back:?}"
);
assert_eq!(
back.to_string(),
err.to_string(),
"TriggerError variant must reconstruct its fields faithfully: {err:?} -> {back:?}"
);
}
}
fn round_trip_audit(err: &AuditError) -> AuditError {
let status = attach_audit_detail(Code::Internal, err.to_string(), err);
audit_error_from_status(&status)
}
#[test]
fn every_owned_shape_audit_variant_round_trips_to_itself() {
let cases = [
AuditError::LengthMismatch { ids: 3, scores: 2 },
AuditError::LineageTooLarge {
actual: 70_000,
max: 65_536,
},
AuditError::NoTenantBinding,
AuditError::SignatureMismatch(
"01906c83-d4c8-7e10-9c4f-3b6f7c5a8e9a"
.parse()
.expect("uuid"),
),
AuditError::MasterKey("expected 64 hex chars, got 10".into()),
AuditError::Storage("mutable-table registry unavailable".into()),
AuditError::Broker("audit topic publish failed".into()),
];
for err in &cases {
let back = round_trip_audit(err);
assert_eq!(
std::mem::discriminant(&back),
std::mem::discriminant(err),
"AuditError variant must reconstruct as itself: {err:?} -> {back:?}"
);
assert_eq!(
back.to_string(),
err.to_string(),
"AuditError variant must reconstruct its fields faithfully: {err:?} -> {back:?}"
);
}
}
#[test]
fn audit_serde_leaf_folds_to_faithful_string() {
let serde_err = serde_json::from_str::<serde_json::Value>("{not json")
.expect_err("malformed JSON must fail to parse");
let inner_display = serde_err.to_string();
let err = AuditError::Serde(serde_err);
match round_trip_audit(&err) {
AuditError::Storage(message) => assert_eq!(
message, inner_display,
"the foreign serde leaf carries the inner error's faithful Display string"
),
other => panic!("a foreign serde leaf must fold to Storage, got {other:?}"),
}
}
}