use std::collections::BTreeMap;
use std::str::FromStr;
use jammi_db::catalog::model_repo::ModelDescriptor;
use jammi_db::catalog::source_repo::SourceDescriptor;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
use jammi_db::trigger::ids::TopicId;
use jammi_db::trigger::TopicDefinition;
use jammi_db::TenantId;
use tonic::Status;
use crate::proto::catalog as pb;
use crate::proto::embedding as embedding_pb;
use crate::{decode_ipc_schema, encode_ipc_stream, result_table_from_proto};
pub fn source_type_from_proto(kind: i32) -> Result<SourceType, Status> {
match pb::SourceKind::try_from(kind) {
Ok(pb::SourceKind::File) => Ok(SourceType::File),
Ok(pb::SourceKind::Postgres) => Ok(SourceType::Postgres),
Ok(pb::SourceKind::Mysql) => Ok(SourceType::Mysql),
Ok(pb::SourceKind::Unspecified) | Err(_) => {
Err(Status::invalid_argument("source_kind must be specified"))
}
}
}
pub fn source_type_to_proto(source_type: SourceType) -> pb::SourceKind {
match source_type {
SourceType::File => pb::SourceKind::File,
SourceType::Postgres => pb::SourceKind::Postgres,
SourceType::Mysql => pb::SourceKind::Mysql,
}
}
impl TryFrom<pb::SourceConnection> for SourceConnection {
type Error = Status;
fn try_from(conn: pb::SourceConnection) -> Result<Self, Self::Error> {
let url = if conn.url.is_empty() {
None
} else {
Some(conn.url)
};
Ok(SourceConnection {
url,
format: file_format_from_proto(conn.format)?,
..Default::default()
})
}
}
impl From<SourceConnection> for pb::SourceConnection {
fn from(conn: SourceConnection) -> Self {
pb::SourceConnection {
url: conn.url.unwrap_or_default(),
format: file_format_to_proto(conn.format) as i32,
}
}
}
fn file_format_to_proto(format: Option<FileFormat>) -> pb::FileFormat {
match format {
Some(FileFormat::Parquet) => pb::FileFormat::Parquet,
Some(FileFormat::Csv) => pb::FileFormat::Csv,
Some(FileFormat::Json) => pb::FileFormat::Json,
Some(FileFormat::Avro) => pb::FileFormat::Avro,
None => pb::FileFormat::Unspecified,
}
}
fn file_format_from_proto(format: i32) -> Result<Option<FileFormat>, Status> {
match pb::FileFormat::try_from(format) {
Ok(pb::FileFormat::Parquet) => Ok(Some(FileFormat::Parquet)),
Ok(pb::FileFormat::Csv) => Ok(Some(FileFormat::Csv)),
Ok(pb::FileFormat::Json) => Ok(Some(FileFormat::Json)),
Ok(pb::FileFormat::Avro) => Ok(Some(FileFormat::Avro)),
Ok(pb::FileFormat::Unspecified) | Err(_) => Ok(None),
}
}
impl From<SourceDescriptor> for pb::SourceDescriptor {
fn from(descriptor: SourceDescriptor) -> Self {
pb::SourceDescriptor {
source_id: descriptor.source_id,
kind: source_type_to_proto(descriptor.source_type) as i32,
status: descriptor.status,
result_tables: descriptor
.result_tables
.into_iter()
.map(embedding_pb::ResultTable::from)
.collect(),
}
}
}
pub fn source_descriptor_from_proto(
descriptor: pb::SourceDescriptor,
) -> Result<SourceDescriptor, Status> {
Ok(SourceDescriptor {
source_id: descriptor.source_id,
source_type: source_type_from_proto(descriptor.kind)?,
status: descriptor.status,
result_tables: descriptor
.result_tables
.into_iter()
.map(result_table_from_proto)
.collect::<Result<_, Status>>()?,
})
}
pub fn model_to_proto(descriptor: &ModelDescriptor) -> pb::Model {
pb::Model {
model_id: descriptor.model_id.clone(),
backend: descriptor.backend.clone(),
task: super::model_task_to_proto(descriptor.task) as i32,
status: descriptor.status.clone(),
}
}
pub fn model_from_proto(model: pb::Model) -> Result<ModelDescriptor, Status> {
let task = super::model_task_from_proto(model.task)?;
Ok(ModelDescriptor {
model_id: model.model_id,
backend: model.backend,
task,
status: model.status,
})
}
pub fn topic_to_proto(topic: &TopicDefinition) -> Result<pb::Topic, Status> {
let schema = encode_ipc_stream(&topic.schema, &[])?;
Ok(pb::Topic {
topic_id: topic.id.to_string(),
name: topic.name.clone(),
schema,
tenant_id: topic.tenant.map(|t| t.to_string()).unwrap_or_default(),
broker_metadata: topic.broker_metadata.clone().into_iter().collect(),
})
}
pub fn topic_from_proto(wire: pb::Topic) -> Result<TopicDefinition, Status> {
let id = TopicId::from_str(&wire.topic_id)
.map_err(|e| Status::invalid_argument(format!("invalid topic_id: {e}")))?;
let schema = decode_ipc_schema(&wire.schema)?;
let tenant = if wire.tenant_id.is_empty() {
None
} else {
Some(
TenantId::from_str(&wire.tenant_id)
.map_err(|e| Status::invalid_argument(format!("invalid tenant id: {e}")))?,
)
};
let broker_metadata: BTreeMap<String, String> = wire.broker_metadata.into_iter().collect();
Ok(TopicDefinition {
id,
name: wire.name,
schema,
tenant,
broker_metadata,
})
}
pub fn match_verdict_to_proto(
verdict: jammi_db::store::manifest::MatchVerdict,
) -> pb::verify_materialization_response::Verdict {
use jammi_db::store::manifest::MatchVerdict;
use pb::verify_materialization_response as v;
match verdict {
MatchVerdict::Match => v::Verdict::Match(v::Match {}),
MatchVerdict::Mismatch { expected, found } => {
v::Verdict::Mismatch(v::Mismatch { expected, found })
}
MatchVerdict::MatchWithUnpinnedInputs { unpinned } => {
v::Verdict::MatchWithUnpinnedInputs(v::MatchWithUnpinnedInputs { unpinned })
}
MatchVerdict::MissingManifest => v::Verdict::MissingManifest(v::MissingManifest {}),
}
}
pub fn match_verdict_from_proto(
verdict: Option<pb::verify_materialization_response::Verdict>,
) -> Result<jammi_db::store::manifest::MatchVerdict, Status> {
use jammi_db::store::manifest::MatchVerdict;
use pb::verify_materialization_response as v;
match verdict {
Some(v::Verdict::Match(_)) => Ok(MatchVerdict::Match),
Some(v::Verdict::Mismatch(m)) => Ok(MatchVerdict::Mismatch {
expected: m.expected,
found: m.found,
}),
Some(v::Verdict::MatchWithUnpinnedInputs(m)) => Ok(MatchVerdict::MatchWithUnpinnedInputs {
unpinned: m.unpinned,
}),
Some(v::Verdict::MissingManifest(_)) => Ok(MatchVerdict::MissingManifest),
None => Err(Status::internal(
"VerifyMaterializationResponse carried no verdict",
)),
}
}
fn stale_reason_to_proto(reason: jammi_db::store::StaleReason) -> pb::StaleReason {
use jammi_db::store::StaleReason;
use pb::stale_reason as r;
let reason = match reason {
StaleReason::DefinitionChanged { recorded, current } => {
r::Reason::DefinitionChanged(r::DefinitionChanged { recorded, current })
}
StaleReason::InputAdvanced {
source,
recorded,
current,
} => r::Reason::InputAdvanced(r::InputAdvanced {
source,
recorded,
current,
}),
StaleReason::InputVanished { source } => {
r::Reason::InputVanished(r::InputVanished { source })
}
};
pb::StaleReason {
reason: Some(reason),
}
}
fn stale_reason_from_proto(
reason: pb::StaleReason,
) -> Result<jammi_db::store::StaleReason, Status> {
use jammi_db::store::StaleReason;
use pb::stale_reason as r;
match reason.reason {
Some(r::Reason::DefinitionChanged(d)) => Ok(StaleReason::DefinitionChanged {
recorded: d.recorded,
current: d.current,
}),
Some(r::Reason::InputAdvanced(a)) => Ok(StaleReason::InputAdvanced {
source: a.source,
recorded: a.recorded,
current: a.current,
}),
Some(r::Reason::InputVanished(v)) => Ok(StaleReason::InputVanished { source: v.source }),
None => Err(Status::internal("StaleReason carried no reason")),
}
}
pub fn staleness_to_proto(
verdict: jammi_db::store::Staleness,
) -> pb::staleness_response::Staleness {
use jammi_db::store::Staleness;
use pb::staleness_response as s;
match verdict {
Staleness::Fresh => s::Staleness::Fresh(s::Fresh {}),
Staleness::Stale { reasons } => s::Staleness::Stale(s::Stale {
reasons: reasons.into_iter().map(stale_reason_to_proto).collect(),
}),
Staleness::Undecidable {
unpinned,
decided_reasons,
} => s::Staleness::Undecidable(s::Undecidable {
unpinned,
decided_reasons: decided_reasons
.into_iter()
.map(stale_reason_to_proto)
.collect(),
}),
Staleness::MissingManifest => s::Staleness::MissingManifest(s::MissingManifest {}),
}
}
pub fn staleness_from_proto(
verdict: Option<pb::staleness_response::Staleness>,
) -> Result<jammi_db::store::Staleness, Status> {
use jammi_db::store::Staleness;
use pb::staleness_response as s;
match verdict {
Some(s::Staleness::Fresh(_)) => Ok(Staleness::Fresh),
Some(s::Staleness::Stale(st)) => Ok(Staleness::Stale {
reasons: st
.reasons
.into_iter()
.map(stale_reason_from_proto)
.collect::<Result<_, _>>()?,
}),
Some(s::Staleness::Undecidable(u)) => Ok(Staleness::Undecidable {
unpinned: u.unpinned,
decided_reasons: u
.decided_reasons
.into_iter()
.map(stale_reason_from_proto)
.collect::<Result<_, _>>()?,
}),
Some(s::Staleness::MissingManifest(_)) => Ok(Staleness::MissingManifest),
None => Err(Status::internal("StalenessResponse carried no staleness")),
}
}
fn anchor_kind_to_proto(kind: jammi_db::store::AnchorKind) -> pb::AnchorKind {
use jammi_db::store::AnchorKind;
match kind {
AnchorKind::ResultDigest => pb::AnchorKind::ResultDigest,
AnchorKind::MutableVersion => pb::AnchorKind::MutableVersion,
AnchorKind::SourceVersion => pb::AnchorKind::SourceVersion,
AnchorKind::UnpinnedAtInstant => pb::AnchorKind::UnpinnedAtInstant,
}
}
fn anchor_kind_from_proto(kind: i32) -> Result<jammi_db::store::AnchorKind, Status> {
use jammi_db::store::AnchorKind;
match pb::AnchorKind::try_from(kind) {
Ok(pb::AnchorKind::ResultDigest) => Ok(AnchorKind::ResultDigest),
Ok(pb::AnchorKind::MutableVersion) => Ok(AnchorKind::MutableVersion),
Ok(pb::AnchorKind::SourceVersion) => Ok(AnchorKind::SourceVersion),
Ok(pb::AnchorKind::UnpinnedAtInstant) => Ok(AnchorKind::UnpinnedAtInstant),
Ok(pb::AnchorKind::Unspecified) | Err(_) => Err(Status::internal(
"DerivesFromEdge carried an unspecified anchor kind",
)),
}
}
pub fn derives_from_edge_to_proto(edge: jammi_db::store::DerivesFromEdge) -> pb::DerivesFromEdge {
pb::DerivesFromEdge {
input: edge.input,
derived: edge.derived,
kind: anchor_kind_to_proto(edge.kind) as i32,
}
}
pub fn derives_from_edge_from_proto(
edge: pb::DerivesFromEdge,
) -> Result<jammi_db::store::DerivesFromEdge, Status> {
Ok(jammi_db::store::DerivesFromEdge {
input: edge.input,
derived: edge.derived,
kind: anchor_kind_from_proto(edge.kind)?,
})
}