use std::collections::BTreeMap;
use std::str::FromStr;
use jammi_db::catalog::model_repo::ModelRecord;
use jammi_db::catalog::source_repo::SourceDescriptor;
use jammi_db::source::{FileFormat, SourceConnection, SourceType};
use jammi_db::trigger::ids::TopicId;
use jammi_db::trigger::TopicDefinition;
use jammi_db::TenantId;
use tonic::Status;
use crate::proto::catalog as pb;
use crate::proto::embedding as embedding_pb;
use crate::{decode_ipc_schema, encode_ipc_stream, result_table_from_proto};
pub fn source_type_from_proto(kind: i32) -> Result<SourceType, Status> {
match pb::SourceKind::try_from(kind) {
Ok(pb::SourceKind::File) => Ok(SourceType::File),
Ok(pb::SourceKind::Postgres) => Ok(SourceType::Postgres),
Ok(pb::SourceKind::Mysql) => Ok(SourceType::Mysql),
Ok(pb::SourceKind::Unspecified) | Err(_) => {
Err(Status::invalid_argument("source_kind must be specified"))
}
}
}
pub fn source_type_to_proto(source_type: SourceType) -> pb::SourceKind {
match source_type {
SourceType::File => pb::SourceKind::File,
SourceType::Postgres => pb::SourceKind::Postgres,
SourceType::Mysql => pb::SourceKind::Mysql,
}
}
impl TryFrom<pb::SourceConnection> for SourceConnection {
type Error = Status;
fn try_from(conn: pb::SourceConnection) -> Result<Self, Self::Error> {
let url = if conn.url.is_empty() {
None
} else {
Some(conn.url)
};
Ok(SourceConnection {
url,
format: file_format_from_proto(conn.format)?,
..Default::default()
})
}
}
impl From<SourceConnection> for pb::SourceConnection {
fn from(conn: SourceConnection) -> Self {
pb::SourceConnection {
url: conn.url.unwrap_or_default(),
format: file_format_to_proto(conn.format) as i32,
}
}
}
fn file_format_to_proto(format: Option<FileFormat>) -> pb::FileFormat {
match format {
Some(FileFormat::Parquet) => pb::FileFormat::Parquet,
Some(FileFormat::Csv) => pb::FileFormat::Csv,
Some(FileFormat::Json) => pb::FileFormat::Json,
Some(FileFormat::Avro) => pb::FileFormat::Avro,
None => pb::FileFormat::Unspecified,
}
}
fn file_format_from_proto(format: i32) -> Result<Option<FileFormat>, Status> {
match pb::FileFormat::try_from(format) {
Ok(pb::FileFormat::Parquet) => Ok(Some(FileFormat::Parquet)),
Ok(pb::FileFormat::Csv) => Ok(Some(FileFormat::Csv)),
Ok(pb::FileFormat::Json) => Ok(Some(FileFormat::Json)),
Ok(pb::FileFormat::Avro) => Ok(Some(FileFormat::Avro)),
Ok(pb::FileFormat::Unspecified) | Err(_) => Ok(None),
}
}
impl From<SourceDescriptor> for pb::SourceDescriptor {
fn from(descriptor: SourceDescriptor) -> Self {
pb::SourceDescriptor {
source_id: descriptor.source_id,
kind: source_type_to_proto(descriptor.source_type) as i32,
status: descriptor.status,
result_tables: descriptor
.result_tables
.into_iter()
.map(embedding_pb::ResultTable::from)
.collect(),
}
}
}
pub fn source_descriptor_from_proto(
descriptor: pb::SourceDescriptor,
) -> Result<SourceDescriptor, Status> {
Ok(SourceDescriptor {
source_id: descriptor.source_id,
source_type: source_type_from_proto(descriptor.kind)?,
status: descriptor.status,
result_tables: descriptor
.result_tables
.into_iter()
.map(result_table_from_proto)
.collect::<Result<_, Status>>()?,
})
}
pub fn model_to_proto(record: &ModelRecord) -> pb::Model {
pb::Model {
model_id: record.model_id.clone(),
backend: record.backend.clone(),
task: super::model_task_to_proto(record.task) as i32,
status: record.status.clone(),
}
}
pub fn model_from_proto(model: pb::Model) -> Result<ModelRecord, Status> {
let task = super::model_task_from_proto(model.task)?;
Ok(ModelRecord {
model_id: model.model_id,
version: 1,
model_type: String::new(),
base_model_id: None,
backend: model.backend,
task,
artifact_path: None,
config_json: None,
status: model.status,
created_at: String::new(),
})
}
pub fn topic_to_proto(topic: &TopicDefinition) -> Result<pb::Topic, Status> {
let schema = encode_ipc_stream(&topic.schema, &[])?;
Ok(pb::Topic {
topic_id: topic.id.to_string(),
name: topic.name.clone(),
schema,
tenant_id: topic.tenant.map(|t| t.to_string()).unwrap_or_default(),
broker_metadata: topic.broker_metadata.clone().into_iter().collect(),
})
}
pub fn topic_from_proto(wire: pb::Topic) -> Result<TopicDefinition, Status> {
let id = TopicId::from_str(&wire.topic_id)
.map_err(|e| Status::invalid_argument(format!("invalid topic_id: {e}")))?;
let schema = decode_ipc_schema(&wire.schema)?;
let tenant = if wire.tenant_id.is_empty() {
None
} else {
Some(
TenantId::from_str(&wire.tenant_id)
.map_err(|e| Status::invalid_argument(format!("invalid tenant id: {e}")))?,
)
};
let broker_metadata: BTreeMap<String, String> = wire.broker_metadata.into_iter().collect();
Ok(TopicDefinition {
id,
name: wire.name,
schema,
tenant,
broker_metadata,
})
}