use crate::error::{JammiError, Result};
use crate::source::{SourceConnection, SourceType};
use super::backend::{SqlValue, TxOptions};
use super::Catalog;
#[derive(Debug, Clone, serde::Serialize)]
pub struct SourceRecord {
pub source_id: String,
pub source_type: SourceType,
pub connection: SourceConnection,
pub schema_json: Option<String>,
pub status: String,
pub created_at: String,
pub updated_at: String,
}
impl Catalog {
pub async fn register_source(
&self,
source_id: &str,
source_type: SourceType,
connection: &SourceConnection,
) -> Result<()> {
let type_str =
serde_json::to_string(&source_type).map_err(|e| JammiError::Catalog(e.to_string()))?;
let uri = connection.url.as_deref().unwrap_or("").to_string();
let options =
serde_json::to_string(connection).map_err(|e| JammiError::Catalog(e.to_string()))?;
let sid = source_id.to_string();
let tenant = self.current_tenant();
self.backend()
.transaction(TxOptions::default(), |tx| {
Box::pin(async move {
tx.set_tenant(tenant);
tx.assert_tenant_matches(tenant, "sources")?;
tx.execute(
"INSERT INTO sources (source_id, name, source_type, uri, options, tenant_id) \
VALUES ($1, $2, $3, $4, $5, $6)",
&[
SqlValue::TextOwned(sid.clone()),
SqlValue::TextOwned(sid),
SqlValue::TextOwned(type_str),
SqlValue::TextOwned(uri),
SqlValue::TextOwned(options),
SqlValue::from(tenant.map(|t| t.to_string())),
],
)
.await?;
Ok(())
})
})
.await?;
Ok(())
}
pub async fn get_source(&self, source_id: &str) -> Result<Option<SourceRecord>> {
let sid = source_id.to_string();
let tenant = self.current_tenant();
let raw = self
.backend()
.transaction(
TxOptions {
read_only: true,
..Default::default()
},
|tx| {
Box::pin(async move {
tx.query_opt(
"SELECT source_id, source_type, options, schema_json, \
'active' AS status, created_at, updated_at \
FROM sources WHERE source_id = $1 \
AND (tenant_id = $2 OR tenant_id IS NULL)",
&[
SqlValue::TextOwned(sid),
SqlValue::from(tenant.map(|t| t.to_string())),
],
read_source_row,
)
.await
})
},
)
.await?;
raw.map(parse_source_row).transpose()
}
pub async fn list_sources(&self) -> Result<Vec<SourceRecord>> {
let tenant = self.current_tenant();
let raws = self
.backend()
.transaction(
TxOptions {
read_only: true,
..Default::default()
},
|tx| {
Box::pin(async move {
tx.query(
"SELECT source_id, source_type, options, schema_json, \
'active' AS status, created_at, updated_at \
FROM sources \
WHERE tenant_id = $1 OR tenant_id IS NULL \
ORDER BY created_at",
&[SqlValue::from(tenant.map(|t| t.to_string()))],
read_source_row,
)
.await
})
},
)
.await?;
raws.into_iter().map(parse_source_row).collect()
}
pub async fn remove_source(&self, source_id: &str) -> Result<()> {
let sid = source_id.to_string();
let tenant = self.current_tenant();
self.backend()
.transaction(TxOptions::default(), |tx| {
Box::pin(async move {
tx.set_tenant(tenant);
tx.execute(
"DELETE FROM sources WHERE source_id = $1 \
AND (tenant_id = $2 OR tenant_id IS NULL)",
&[
SqlValue::TextOwned(sid),
SqlValue::from(tenant.map(|t| t.to_string())),
],
)
.await?;
Ok(())
})
})
.await?;
Ok(())
}
}
struct RawSourceRow {
source_id: String,
source_type: String,
options: Option<String>,
schema_json: Option<String>,
status: String,
created_at: String,
updated_at: String,
}
fn read_source_row(
row: &super::backend::Row<'_>,
) -> std::result::Result<RawSourceRow, super::backend::BackendError> {
Ok(RawSourceRow {
source_id: row.get("source_id")?,
source_type: row.get("source_type")?,
options: row.try_get("options")?,
schema_json: row.try_get("schema_json")?,
status: row.get("status")?,
created_at: row.get("created_at")?,
updated_at: row.get("updated_at")?,
})
}
fn parse_source_row(raw: RawSourceRow) -> Result<SourceRecord> {
let source_type: SourceType = serde_json::from_str(&raw.source_type)
.map_err(|e| JammiError::Catalog(format!("Invalid source_type: {e}")))?;
let connection: SourceConnection = raw
.options
.as_deref()
.map(serde_json::from_str)
.transpose()
.map_err(|e| JammiError::Catalog(format!("Invalid options: {e}")))?
.unwrap_or_default();
Ok(SourceRecord {
source_id: raw.source_id,
source_type,
connection,
schema_json: raw.schema_json,
status: raw.status,
created_at: raw.created_at,
updated_at: raw.updated_at,
})
}