use crate::{FieldSpec, SchemaOperation, SchemaPlan, StorageAdapter, StorageError};
use pylon_kernel::AppManifest;
fn pg_column_type(field_type: &str) -> &'static str {
match field_type {
"string" => "TEXT",
"int" => "INTEGER",
"float" => "DOUBLE PRECISION",
"bool" => "BOOLEAN",
"datetime" => "TIMESTAMPTZ",
"richtext" => "TEXT",
_ if field_type.starts_with("id(") => "TEXT",
_ => "TEXT",
}
}
pub(crate) fn quote_ident(name: &str) -> String {
format!("\"{}\"", name.replace('"', "\"\""))
}
#[cfg(feature = "postgres-live")]
pub fn quote_ident_pub(name: &str) -> String {
quote_ident(name)
}
#[cfg(feature = "postgres-live")]
pub fn row_to_json_pub(row: &postgres::Row) -> serde_json::Value {
live::row_to_json(row)
}
#[cfg(feature = "postgres-live")]
pub fn build_query_filtered_sql_pub(
entity: &str,
filter: &serde_json::Value,
valid_columns: &[String],
) -> Result<(String, Vec<JsonParam>), StorageError> {
live::LivePostgresAdapter::build_query_filtered_sql(entity, filter, valid_columns)
}
#[cfg(feature = "postgres-live")]
pub fn build_aggregate_sql_pub(
entity: &str,
spec: &serde_json::Value,
valid_columns: &[String],
) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
live::LivePostgresAdapter::build_aggregate_sql(entity, spec, valid_columns)
}
#[cfg(feature = "postgres-live")]
pub fn aggregate_rows_to_json_pub(
rows: &[postgres::Row],
column_names: &[String],
) -> serde_json::Value {
live::aggregate_rows_to_json(rows, column_names)
}
pub fn create_table_sql(entity_name: &str, fields: &[FieldSpec]) -> String {
let mut columns = vec!["id TEXT PRIMARY KEY NOT NULL".to_string()];
for field in fields {
let col_type = pg_column_type(&field.field_type);
let not_null = if field.optional { "" } else { " NOT NULL" };
let unique = if field.unique { " UNIQUE" } else { "" };
columns.push(format!(
"{} {}{}{}",
quote_ident(&field.name),
col_type,
not_null,
unique
));
}
format!(
"CREATE TABLE IF NOT EXISTS {} ({})",
quote_ident(entity_name),
columns.join(", ")
)
}
pub fn add_column_sql(entity_name: &str, field: &FieldSpec) -> String {
let col_type = pg_column_type(&field.field_type);
let unique = if field.unique { " UNIQUE" } else { "" };
format!(
"ALTER TABLE {} ADD COLUMN {} {}{}",
quote_ident(entity_name),
quote_ident(&field.name),
col_type,
unique
)
}
pub fn create_index_sql(
entity_name: &str,
index_name: &str,
fields: &[String],
unique: bool,
) -> String {
let unique_str = if unique { "UNIQUE " } else { "" };
let full_index_name = format!("{}_{}", entity_name, index_name);
let quoted_fields: Vec<String> = fields.iter().map(|f| quote_ident(f)).collect();
format!(
"CREATE {}INDEX IF NOT EXISTS {} ON {} ({})",
unique_str,
quote_ident(&full_index_name),
quote_ident(entity_name),
quoted_fields.join(", ")
)
}
pub struct PostgresAdapter;
impl StorageAdapter for PostgresAdapter {
fn plan_schema(&self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
let mut operations = Vec::new();
for entity in &target.entities {
let fields: Vec<FieldSpec> = entity
.fields
.iter()
.map(|f| FieldSpec {
name: f.name.clone(),
field_type: f.field_type.clone(),
optional: f.optional,
unique: f.unique,
})
.collect();
operations.push(SchemaOperation::CreateEntity {
name: entity.name.clone(),
fields,
});
for index in &entity.indexes {
operations.push(SchemaOperation::AddIndex {
entity: entity.name.clone(),
name: index.name.clone(),
fields: index.fields.clone(),
unique: index.unique,
});
}
}
if operations.is_empty() {
operations.push(SchemaOperation::Noop);
}
Ok(SchemaPlan { operations })
}
}
pub fn plan_to_sql(plan: &SchemaPlan) -> Result<Vec<String>, StorageError> {
let mut statements = Vec::new();
for op in &plan.operations {
match op {
SchemaOperation::CreateEntity { name, fields } => {
statements.push(create_table_sql(name, fields));
}
SchemaOperation::AddField { entity, field } => {
statements.push(add_column_sql(entity, field));
}
SchemaOperation::AlterField {
entity,
previous,
target,
} => {
if previous.optional && !target.optional {
statements.push(format!(
"ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
quote_ident(entity),
quote_ident(&target.name)
));
} else if !previous.optional && target.optional {
statements.push(format!(
"ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
quote_ident(entity),
quote_ident(&target.name)
));
}
}
SchemaOperation::AddIndex {
entity,
name,
fields,
unique,
} => {
statements.push(create_index_sql(entity, name, fields, *unique));
}
SchemaOperation::CreateSearchIndex { entity, config } => {
#[cfg(feature = "postgres-live")]
{
statements.extend(crate::pg_search::create_search_index_sql(entity, config));
}
#[cfg(not(feature = "postgres-live"))]
{
let _ = (entity, config);
return Err(StorageError {
code: "PG_SEARCH_FEATURE_OFF".into(),
message: "CreateSearchIndex requires the `postgres-live` feature".into(),
});
}
}
SchemaOperation::RemoveSearchIndex { entity } => {
statements.push(format!(
"DROP TABLE IF EXISTS {} CASCADE",
quote_ident(&format!("_fts_{entity}"))
));
statements.push(format!(
"DROP INDEX IF EXISTS {}",
quote_ident(&format!("{entity}_fts_gin"))
));
}
SchemaOperation::Noop => {}
other => {
return Err(StorageError {
code: "PG_OP_UNSUPPORTED".into(),
message: format!("Operation not supported by Postgres adapter: {other:?}"),
});
}
}
}
Ok(statements)
}
pub const INTROSPECT_TABLES_SQL: &str = "\
SELECT table_name \
FROM information_schema.tables \
WHERE table_schema = 'public' \
AND table_type = 'BASE TABLE' \
AND table_name NOT LIKE '_pylon_%' \
ORDER BY table_name";
pub const INTROSPECT_COLUMNS_SQL: &str = "\
SELECT column_name, data_type, is_nullable, \
(SELECT COUNT(*) FROM information_schema.table_constraints tc \
JOIN information_schema.key_column_usage kcu \
ON tc.constraint_name = kcu.constraint_name \
WHERE tc.table_name = c.table_name \
AND kcu.column_name = c.column_name \
AND tc.constraint_type = 'PRIMARY KEY') as is_pk \
FROM information_schema.columns c \
WHERE table_schema = 'public' AND table_name = $1 \
ORDER BY ordinal_position";
pub const INTROSPECT_INDEXES_SQL: &str = "\
SELECT i.relname as index_name, \
ix.indisunique as is_unique, \
array_agg(a.attname ORDER BY array_position(ix.indkey, a.attnum)) as columns \
FROM pg_index ix \
JOIN pg_class t ON t.oid = ix.indrelid \
JOIN pg_class i ON i.oid = ix.indexrelid \
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) \
JOIN pg_namespace n ON n.oid = t.relnamespace \
WHERE n.nspname = 'public' \
AND t.relname = $1 \
AND NOT ix.indisprimary \
GROUP BY i.relname, ix.indisunique \
ORDER BY i.relname";
pub fn plan_from_snapshot(snapshot: &crate::SchemaSnapshot, target: &AppManifest) -> SchemaPlan {
crate::plan_from_snapshot(snapshot, target)
}
pub fn generate_id() -> String {
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU32 = AtomicU32::new(0);
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("{ts:032x}{seq:08x}")
}
pub fn json_value_to_string(val: &serde_json::Value) -> String {
match val {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => String::new(),
other => other.to_string(),
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum JsonParam {
Null,
Text(String),
Int(i64),
Float(f64),
Bool(bool),
}
impl JsonParam {
pub fn from_json(val: &serde_json::Value) -> Self {
match val {
serde_json::Value::Null => JsonParam::Null,
serde_json::Value::String(s) => JsonParam::Text(s.clone()),
serde_json::Value::Bool(b) => JsonParam::Bool(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
JsonParam::Int(i)
} else if let Some(f) = n.as_f64() {
JsonParam::Float(f)
} else {
JsonParam::Text(n.to_string())
}
}
other => JsonParam::Text(other.to_string()),
}
}
}
#[cfg(feature = "postgres-live")]
impl postgres::types::ToSql for JsonParam {
fn to_sql(
&self,
ty: &postgres::types::Type,
out: &mut bytes::BytesMut,
) -> Result<postgres::types::IsNull, Box<dyn std::error::Error + Sync + Send>> {
use postgres::types::Type;
if matches!(self, JsonParam::Null) {
return Ok(postgres::types::IsNull::Yes);
}
match (self, ty) {
(JsonParam::Bool(b), &Type::BOOL) => b.to_sql(ty, out),
(JsonParam::Int(n), &Type::INT2) => (*n as i16).to_sql(ty, out),
(JsonParam::Int(n), &Type::INT4) => (*n as i32).to_sql(ty, out),
(JsonParam::Int(n), &Type::INT8) => n.to_sql(ty, out),
(JsonParam::Int(n), &Type::FLOAT4) => (*n as f32).to_sql(ty, out),
(JsonParam::Int(n), &Type::FLOAT8) => (*n as f64).to_sql(ty, out),
(JsonParam::Float(f), &Type::FLOAT4) => (*f as f32).to_sql(ty, out),
(JsonParam::Float(f), &Type::FLOAT8) => f.to_sql(ty, out),
(JsonParam::Float(f), &Type::INT4) => (*f as i32).to_sql(ty, out),
(JsonParam::Float(f), &Type::INT8) => (*f as i64).to_sql(ty, out),
(JsonParam::Text(s), &Type::TEXT)
| (JsonParam::Text(s), &Type::VARCHAR)
| (JsonParam::Text(s), &Type::BPCHAR)
| (JsonParam::Text(s), &Type::NAME) => s.to_sql(ty, out),
(JsonParam::Text(s), &Type::TIMESTAMPTZ) => {
let dt = chrono::DateTime::parse_from_rfc3339(s)
.map_err(|e| format!("invalid TIMESTAMPTZ string {s:?}: {e}"))?
.with_timezone(&chrono::Utc);
dt.to_sql(ty, out)
}
(JsonParam::Text(s), &Type::TIMESTAMP) => {
let dt = chrono::DateTime::parse_from_rfc3339(s)
.map_err(|e| format!("invalid TIMESTAMP string {s:?}: {e}"))?
.with_timezone(&chrono::Utc)
.naive_utc();
dt.to_sql(ty, out)
}
(JsonParam::Text(s), &Type::DATE) => {
let dt = chrono::DateTime::parse_from_rfc3339(s)
.map_err(|e| format!("invalid DATE string {s:?}: {e}"))?
.with_timezone(&chrono::Utc)
.date_naive();
dt.to_sql(ty, out)
}
(other, _) => {
let s = match other {
JsonParam::Bool(b) => b.to_string(),
JsonParam::Int(n) => n.to_string(),
JsonParam::Float(f) => f.to_string(),
JsonParam::Text(s) => s.clone(),
JsonParam::Null => unreachable!(),
};
s.to_sql(ty, out)
}
}
}
fn accepts(_ty: &postgres::types::Type) -> bool {
true
}
postgres::types::to_sql_checked!();
}
pub fn build_insert_sql(
entity: &str,
data: &serde_json::Value,
) -> Result<(String, Vec<JsonParam>), StorageError> {
let obj = data.as_object().ok_or_else(|| StorageError {
code: "PG_INVALID_DATA".into(),
message: "Insert data must be a JSON object".into(),
})?;
let id = match obj.get("id") {
None | Some(serde_json::Value::Null) => generate_id(),
Some(serde_json::Value::String(s)) => s.clone(),
Some(other) => {
return Err(StorageError {
code: "PG_INVALID_ID".into(),
message: format!(
"Insert data carried a non-string `id` value: {other}. Pylon row ids \
are always strings (40-char hex). Drop the `id` field to let the \
server generate one, or supply a string."
),
});
}
};
let mut col_names = vec!["id".to_string()];
let mut placeholders = vec!["$1".to_string()];
let mut values: Vec<JsonParam> = vec![JsonParam::Text(id)];
let mut i = 0usize;
for (key, val) in obj {
if key == "id" {
continue;
}
col_names.push(quote_ident(key));
placeholders.push(format!("${}", i + 2));
values.push(JsonParam::from_json(val));
i += 1;
}
let sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
quote_ident(entity),
col_names.join(", "),
placeholders.join(", ")
);
Ok((sql, values))
}
pub fn build_update_sql(
entity: &str,
id: &str,
data: &serde_json::Value,
) -> Result<(String, Vec<JsonParam>), StorageError> {
let obj = data.as_object().ok_or_else(|| StorageError {
code: "PG_INVALID_DATA".into(),
message: "Update data must be a JSON object".into(),
})?;
if obj.is_empty() {
return Err(StorageError {
code: "PG_INVALID_DATA".into(),
message: "Update data must contain at least one field".into(),
});
}
let mut set_clauses = Vec::new();
let mut values: Vec<JsonParam> = vec![JsonParam::Text(id.to_string())];
let mut i = 0usize;
for (key, val) in obj {
if key == "id" {
return Err(StorageError {
code: "PG_INVALID_UPDATE".into(),
message:
"Updating the `id` column is not allowed — Pylon row ids are immutable; \
drop the field from the patch."
.into(),
});
}
set_clauses.push(format!("{} = ${}", quote_ident(key), i + 2));
values.push(JsonParam::from_json(val));
i += 1;
}
if set_clauses.is_empty() {
return Err(StorageError {
code: "PG_INVALID_DATA".into(),
message: "Update data must contain at least one non-id field".into(),
});
}
let sql = format!(
"UPDATE {} SET {} WHERE id = $1",
quote_ident(entity),
set_clauses.join(", ")
);
Ok((sql, values))
}
#[cfg(feature = "postgres-live")]
fn as_pg_params(values: &[JsonParam]) -> Vec<&(dyn postgres::types::ToSql + Sync)> {
values
.iter()
.map(|v| v as &(dyn postgres::types::ToSql + Sync))
.collect()
}
#[cfg(feature = "postgres-live")]
pub mod live {
use super::*;
use crate::{
ColumnSnapshot, IndexSnapshot, SchemaSnapshot, StorageAdapter, StorageError, TableSnapshot,
};
pub(super) struct PgUrlSsl {
pub use_tls: bool,
}
pub(super) fn parse_pg_url_ssl(url: &str) -> (String, PgUrlSsl) {
let (base, query) = match url.find('?') {
Some(idx) => (&url[..idx], &url[idx + 1..]),
None => return (url.to_string(), PgUrlSsl { use_tls: false }),
};
let mut use_tls = false;
let mut kept: Vec<String> = Vec::new();
for pair in query.split('&') {
if pair.is_empty() {
continue;
}
let (k, v) = match pair.find('=') {
Some(i) => (&pair[..i], &pair[i + 1..]),
None => (pair, ""),
};
match k {
"sslmode" => match v.to_ascii_lowercase().as_str() {
"disable" | "allow" => {
kept.push("sslmode=disable".to_string());
}
"prefer" => {
kept.push("sslmode=prefer".to_string());
}
"require" | "verify-ca" | "verify-full" | "" => {
use_tls = true;
kept.push("sslmode=require".to_string());
}
other => {
tracing::warn!(
"[pg] unknown sslmode='{other}' — defaulting to require + TLS"
);
use_tls = true;
kept.push("sslmode=require".to_string());
}
},
"sslrootcert" => {
if v != "system" && !v.is_empty() {
tracing::warn!(
"[pg] sslrootcert={v} ignored — native-tls uses system roots"
);
}
use_tls = true;
}
_ => {
kept.push(pair.to_string());
}
}
}
let cleaned = if kept.is_empty() {
base.to_string()
} else {
format!("{}?{}", base, kept.join("&"))
};
(cleaned, PgUrlSsl { use_tls })
}
#[cfg(test)]
mod url_tests {
use super::parse_pg_url_ssl;
#[test]
fn strips_libpq_only_sslmode_verify_full() {
let (cleaned, ssl) = parse_pg_url_ssl(
"postgres://u:p@h:5432/db?sslmode=verify-full&sslrootcert=system",
);
assert!(ssl.use_tls);
assert_eq!(cleaned, "postgres://u:p@h:5432/db?sslmode=require");
}
#[test]
fn passes_through_disable() {
let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db?sslmode=disable");
assert!(!ssl.use_tls);
assert_eq!(cleaned, "postgres://h/db?sslmode=disable");
}
#[test]
fn no_query_string_no_tls() {
let (cleaned, ssl) = parse_pg_url_ssl("postgres://h/db");
assert!(!ssl.use_tls);
assert_eq!(cleaned, "postgres://h/db");
}
#[test]
fn unknown_params_pass_through() {
let (cleaned, _) = parse_pg_url_ssl(
"postgres://h/db?application_name=pylon&connect_timeout=5",
);
assert!(cleaned.contains("application_name=pylon"));
assert!(cleaned.contains("connect_timeout=5"));
}
#[test]
fn sslrootcert_alone_enables_tls() {
let (cleaned, ssl) = parse_pg_url_ssl(
"postgres://h/db?sslrootcert=system",
);
assert!(ssl.use_tls);
assert_eq!(cleaned, "postgres://h/db");
}
}
pub struct LivePostgresAdapter {
client: postgres::Client,
}
impl LivePostgresAdapter {
pub(crate) fn client_mut(&mut self) -> &mut postgres::Client {
&mut self.client
}
pub fn connect(url: &str) -> Result<Self, StorageError> {
let (cleaned, ssl) = parse_pg_url_ssl(url);
let result = if ssl.use_tls {
let connector =
native_tls::TlsConnector::new().map_err(|e| StorageError {
code: "PG_TLS_INIT_FAILED".into(),
message: format!("Failed to initialize TLS: {e}"),
})?;
let tls = postgres_native_tls::MakeTlsConnector::new(connector);
postgres::Client::connect(&cleaned, tls)
} else {
postgres::Client::connect(&cleaned, postgres::NoTls)
};
let client = result.map_err(|e| StorageError {
code: "PG_CONNECT_FAILED".into(),
message: format!("Failed to connect to Postgres: {e}"),
})?;
Ok(Self { client })
}
pub fn read_schema(&mut self) -> Result<SchemaSnapshot, StorageError> {
let table_rows = self
.client
.query(INTROSPECT_TABLES_SQL, &[])
.map_err(pg_err)?;
let mut tables = Vec::new();
for row in &table_rows {
let table_name: String = row.get(0);
let columns = self.read_columns(&table_name)?;
let indexes = self.read_indexes(&table_name)?;
tables.push(TableSnapshot {
name: table_name,
columns,
indexes,
});
}
Ok(SchemaSnapshot { tables })
}
fn read_columns(&mut self, table: &str) -> Result<Vec<ColumnSnapshot>, StorageError> {
let rows = self
.client
.query(INTROSPECT_COLUMNS_SQL, &[&table])
.map_err(pg_err)?;
let mut columns = Vec::new();
for row in &rows {
let name: String = row.get(0);
let data_type: String = row.get(1);
let is_nullable: String = row.get(2);
let is_pk: i64 = row.get(3);
columns.push(ColumnSnapshot {
name,
column_type: data_type,
notnull: is_nullable == "NO",
primary_key: is_pk > 0,
});
}
Ok(columns)
}
fn read_indexes(&mut self, table: &str) -> Result<Vec<IndexSnapshot>, StorageError> {
let rows = self
.client
.query(INTROSPECT_INDEXES_SQL, &[&table])
.map_err(pg_err)?;
let mut indexes = Vec::new();
for row in &rows {
let name: String = row.get(0);
let unique: bool = row.get(1);
let columns: Vec<String> = row.get(2);
indexes.push(IndexSnapshot {
name,
columns,
unique,
});
}
Ok(indexes)
}
pub fn plan_from_live(&mut self, target: &AppManifest) -> Result<SchemaPlan, StorageError> {
let snapshot = self.read_schema()?;
Ok(crate::plan_from_snapshot(&snapshot, target))
}
}
impl StorageAdapter for LivePostgresAdapter {
fn plan_schema(&self, _target: &AppManifest) -> Result<SchemaPlan, StorageError> {
Err(StorageError {
code: "PG_PLAN_NEEDS_MUTABLE".into(),
message: "Use plan_from_live() instead for live Postgres planning".into(),
})
}
fn apply_schema(&self, _plan: &SchemaPlan) -> Result<(), StorageError> {
Err(StorageError {
code: "PG_APPLY_USE_METHOD".into(),
message: "Use apply_plan() instead of the trait method for live Postgres".into(),
})
}
}
impl LivePostgresAdapter {
pub fn apply_plan(&mut self, plan: &SchemaPlan) -> Result<(), StorageError> {
let statements = plan_to_sql(plan)?;
for sql in &statements {
self.client.execute(sql.as_str(), &[]).map_err(pg_err)?;
}
Ok(())
}
pub fn exec_raw(&mut self, sql: &str) -> Result<u64, StorageError> {
self.client.execute(sql, &[]).map_err(pg_err)
}
pub fn insert(
&mut self,
entity: &str,
data: &serde_json::Value,
) -> Result<String, StorageError> {
let (sql, values) = build_insert_sql(entity, data)?;
let id = match &values[0] {
JsonParam::Text(s) => s.clone(),
_ => {
return Err(StorageError {
code: "PG_INTERNAL".into(),
message: "build_insert_sql produced non-text id param".into(),
});
}
};
let params = as_pg_params(&values);
self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
Ok(id)
}
pub fn get_by_id(
&mut self,
entity: &str,
id: &str,
) -> Result<Option<serde_json::Value>, StorageError> {
let sql = format!("SELECT * FROM {} WHERE id = $1", quote_ident(entity));
let rows = self.client.query(sql.as_str(), &[&id]).map_err(pg_err)?;
match rows.first() {
Some(row) => Ok(Some(row_to_json(row))),
None => Ok(None),
}
}
pub fn list(&mut self, entity: &str) -> Result<Vec<serde_json::Value>, StorageError> {
let sql = format!("SELECT * FROM {}", quote_ident(entity));
let rows = self.client.query(sql.as_str(), &[]).map_err(pg_err)?;
Ok(rows.iter().map(row_to_json).collect())
}
pub fn list_after(
&mut self,
entity: &str,
after: Option<&str>,
limit: usize,
) -> Result<Vec<serde_json::Value>, StorageError> {
let capped: i64 = limit.min(10_000) as i64;
let sql = match after {
Some(_) => format!(
"SELECT * FROM {} WHERE id > $1 ORDER BY id ASC LIMIT $2",
quote_ident(entity)
),
None => format!(
"SELECT * FROM {} ORDER BY id ASC LIMIT $1",
quote_ident(entity)
),
};
let rows = match after {
Some(cursor) => self
.client
.query(sql.as_str(), &[&cursor, &capped])
.map_err(pg_err)?,
None => self
.client
.query(sql.as_str(), &[&capped])
.map_err(pg_err)?,
};
Ok(rows.iter().map(row_to_json).collect())
}
pub fn update(
&mut self,
entity: &str,
id: &str,
data: &serde_json::Value,
) -> Result<bool, StorageError> {
let (sql, values) = build_update_sql(entity, id, data)?;
let params = as_pg_params(&values);
let rows_affected = self.client.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
Ok(rows_affected > 0)
}
pub fn delete(&mut self, entity: &str, id: &str) -> Result<bool, StorageError> {
let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
let rows_affected = self.client.execute(sql.as_str(), &[&id]).map_err(pg_err)?;
Ok(rows_affected > 0)
}
pub fn lookup_field(
&mut self,
entity: &str,
field: &str,
value: &str,
) -> Result<Option<serde_json::Value>, StorageError> {
let sql = format!(
"SELECT * FROM {} WHERE {} = $1 LIMIT 1",
quote_ident(entity),
quote_ident(field),
);
let rows = self.client.query(sql.as_str(), &[&value]).map_err(pg_err)?;
Ok(rows.first().map(row_to_json))
}
pub fn query_filtered(
&mut self,
entity: &str,
filter: &serde_json::Value,
valid_columns: &[String],
) -> Result<Vec<serde_json::Value>, StorageError> {
let (sql, params) = Self::build_query_filtered_sql(entity, filter, valid_columns)?;
let pg_params = as_pg_params(¶ms);
let rows = self
.client
.query(sql.as_str(), &pg_params)
.map_err(pg_err)?;
Ok(rows.iter().map(row_to_json).collect())
}
pub(crate) fn build_query_filtered_sql(
entity: &str,
filter: &serde_json::Value,
valid_columns: &[String],
) -> Result<(String, Vec<JsonParam>), StorageError> {
let empty = serde_json::Map::new();
let obj = filter.as_object().unwrap_or(&empty);
let validate = |col: &str| -> Result<(), StorageError> {
if col == "id" || valid_columns.iter().any(|c| c == col) {
Ok(())
} else {
Err(StorageError {
code: "UNKNOWN_COLUMN".into(),
message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
})
}
};
let mut where_clauses: Vec<String> = Vec::new();
let mut order_clause = String::new();
let mut limit_clause = String::new();
let mut offset_clause = String::new();
let mut planned: Vec<(String, String, JsonParam)> = Vec::new();
for (key, val) in obj {
match key.as_str() {
"$search" => {
let raw = match val {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
let placeholder_n = planned.len() + 1;
where_clauses.push(format!(
"{}.id IN (SELECT entity_id FROM \"_fts_{entity}\" \
WHERE tsv @@ plainto_tsquery('english', ${placeholder_n}))",
quote_ident(entity),
));
planned.push((
format!("__search_{}", planned.len()),
"__INLINE__".into(),
JsonParam::Text(raw),
));
}
"$order" => {
if let Some(ord) = val.as_object() {
let mut parts = Vec::new();
for (col, dir) in ord {
validate(col)?;
let d = match dir.as_str().unwrap_or("asc") {
"desc" | "DESC" => "DESC",
_ => "ASC",
};
parts.push(format!("{} {d}", quote_ident(col)));
}
if !parts.is_empty() {
order_clause = format!(" ORDER BY {}", parts.join(", "));
}
}
}
"$limit" => {
if let Some(n) = val.as_u64() {
limit_clause = format!(" LIMIT {}", n);
}
}
"$offset" => {
if let Some(n) = val.as_u64() {
offset_clause = format!(" OFFSET {}", n);
}
}
field => {
validate(field)?;
match val {
serde_json::Value::Object(ops) => {
for (op, v) in ops {
match op.as_str() {
"$not" => planned.push((
field.into(),
"!=".into(),
value_to_pg(v),
)),
"$gt" => {
planned.push((field.into(), ">".into(), value_to_pg(v)))
}
"$gte" => planned.push((
field.into(),
">=".into(),
value_to_pg(v),
)),
"$lt" => {
planned.push((field.into(), "<".into(), value_to_pg(v)))
}
"$lte" => planned.push((
field.into(),
"<=".into(),
value_to_pg(v),
)),
"$like" => {
let raw = match v {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
planned.push((
field.into(),
"LIKE".into(),
JsonParam::Text(format!("%{raw}%")),
));
}
"$in" => {
if let Some(arr) = v.as_array() {
if arr.is_empty() {
where_clauses.push("FALSE".into());
} else {
let placeholders: Vec<String> = (0..arr.len())
.map(|i| {
format!("${}", planned.len() + 1 + i)
})
.collect();
where_clauses.push(format!(
"{} IN ({})",
quote_ident(field),
placeholders.join(", "),
));
for x in arr {
planned.push((
format!("__inline_{}", planned.len()),
"__INLINE__".into(),
value_to_pg(x),
));
}
}
}
}
_ => {}
}
}
}
_ => planned.push((field.into(), "=".into(), value_to_pg(val))),
}
}
}
}
let mut params: Vec<JsonParam> = Vec::with_capacity(planned.len());
for (field, op, v) in &planned {
if op == "__INLINE__" {
params.push(v.clone());
} else {
let placeholder = format!("${}", params.len() + 1);
where_clauses.push(format!("{} {} {}", quote_ident(field), op, placeholder));
params.push(v.clone());
}
}
let where_sql = if where_clauses.is_empty() {
String::new()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};
let final_order = if order_clause.is_empty() {
format!(" ORDER BY {}", quote_ident("id"))
} else {
order_clause
};
let sql = format!(
"SELECT * FROM {}{}{}{}{}",
quote_ident(entity),
where_sql,
final_order,
limit_clause,
offset_clause,
);
Ok((sql, params))
}
pub fn aggregate(
&mut self,
entity: &str,
spec: &serde_json::Value,
valid_columns: &[String],
) -> Result<serde_json::Value, StorageError> {
let (sql, params, column_names) =
Self::build_aggregate_sql(entity, spec, valid_columns)?;
let pg_params = as_pg_params(¶ms);
let rows = self
.client
.query(sql.as_str(), &pg_params)
.map_err(pg_err)?;
Ok(aggregate_rows_to_json(&rows, &column_names))
}
pub(crate) fn build_aggregate_sql(
entity: &str,
spec: &serde_json::Value,
valid_columns: &[String],
) -> Result<(String, Vec<JsonParam>, Vec<String>), StorageError> {
let obj = spec.as_object().ok_or_else(|| StorageError {
code: "INVALID_QUERY".into(),
message: "aggregate spec must be a JSON object".into(),
})?;
let validate = |col: &str| -> Result<(), StorageError> {
if col == "id" || valid_columns.iter().any(|c| c == col) {
Ok(())
} else {
Err(StorageError {
code: "UNKNOWN_COLUMN".into(),
message: format!("Unknown column \"{col}\" on entity \"{entity}\""),
})
}
};
let mut select_parts: Vec<String> = Vec::new();
let mut result_fields: Vec<String> = Vec::new();
if let Some(count) = obj.get("count") {
match count {
serde_json::Value::String(s) if s == "*" => {
select_parts.push("COUNT(*) AS count".into());
result_fields.push("count".into());
}
serde_json::Value::String(field) => {
validate(field)?;
let alias = format!("count_{field}");
select_parts.push(format!(
"COUNT({}) AS {}",
quote_ident(field),
quote_ident(&alias),
));
result_fields.push(alias);
}
_ => {}
}
}
for (fn_name, prefix) in [
("sum", "sum_"),
("avg", "avg_"),
("min", "min_"),
("max", "max_"),
] {
if let Some(fields) = obj.get(fn_name).and_then(|v| v.as_array()) {
for field in fields {
if let Some(f) = field.as_str() {
validate(f)?;
let alias = format!("{prefix}{f}");
let sql_fn = fn_name.to_uppercase();
select_parts.push(format!(
"{}({}) AS {}",
sql_fn,
quote_ident(f),
quote_ident(&alias),
));
result_fields.push(alias);
}
}
}
}
if let Some(fields) = obj.get("countDistinct").and_then(|v| v.as_array()) {
for field in fields {
if let Some(f) = field.as_str() {
validate(f)?;
let alias = format!("count_distinct_{f}");
select_parts.push(format!(
"COUNT(DISTINCT {}) AS {}",
quote_ident(f),
quote_ident(&alias),
));
result_fields.push(alias);
}
}
}
let mut group_by: Vec<String> = Vec::new();
let mut group_select: Vec<String> = Vec::new();
let mut group_field_names: Vec<String> = Vec::new();
if let Some(groups) = obj.get("groupBy").and_then(|v| v.as_array()) {
for g in groups {
if let Some(f) = g.as_str() {
validate(f)?;
let q = quote_ident(f);
group_by.push(q.clone());
group_select.push(q);
group_field_names.push(f.to_string());
} else if let Some(spec) = g.as_object() {
let field =
spec.get("field").and_then(|v| v.as_str()).ok_or_else(|| {
StorageError {
code: "INVALID_QUERY".into(),
message: "groupBy object spec requires `field`".into(),
}
})?;
validate(field)?;
let bucket = spec.get("bucket").and_then(|v| v.as_str()).unwrap_or("day");
let trunc_unit = match bucket {
"hour" | "day" | "week" | "month" | "year" => bucket,
_ => {
return Err(StorageError {
code: "INVALID_QUERY".into(),
message: format!(
"bucket must be one of hour/day/week/month/year, got {bucket}"
),
});
}
};
let alias = format!("{field}_{bucket}");
let expr = format!("date_trunc('{}', {})", trunc_unit, quote_ident(field),);
group_by.push(expr.clone());
group_select.push(format!("{} AS {}", expr, quote_ident(&alias)));
group_field_names.push(alias);
}
}
}
let mut full_select = group_select.clone();
full_select.extend(select_parts.iter().cloned());
if full_select.is_empty() {
return Err(StorageError {
code: "INVALID_QUERY".into(),
message: "aggregate spec must include count/sum/avg/min/max/groupBy".into(),
});
}
let mut where_clauses: Vec<String> = Vec::new();
let mut params: Vec<JsonParam> = Vec::new();
if let Some(w) = obj.get("where").and_then(|v| v.as_object()) {
for (k, v) in w {
validate(k)?;
let placeholder = format!("${}", params.len() + 1);
where_clauses.push(format!("{} = {}", quote_ident(k), placeholder));
params.push(value_to_pg(v));
}
}
let where_sql = if where_clauses.is_empty() {
String::new()
} else {
format!(" WHERE {}", where_clauses.join(" AND "))
};
let group_sql = if group_by.is_empty() {
String::new()
} else {
format!(" GROUP BY {}", group_by.join(", "))
};
let sql = format!(
"SELECT {} FROM {}{}{}",
full_select.join(", "),
quote_ident(entity),
where_sql,
group_sql,
);
let column_names: Vec<String> = group_field_names
.iter()
.chain(result_fields.iter())
.cloned()
.collect();
Ok((sql, params, column_names))
}
}
pub fn aggregate_rows_to_json(
rows: &[postgres::Row],
column_names: &[String],
) -> serde_json::Value {
let mut out: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
for row in rows {
let row_json = row_to_json(row);
if let serde_json::Value::Object(map) = &row_json {
let mut filtered = serde_json::Map::new();
for name in column_names {
if let Some(v) = map.get(name) {
filtered.insert(name.clone(), v.clone());
}
}
out.push(serde_json::Value::Object(filtered));
} else {
out.push(row_json);
}
}
serde_json::json!({ "rows": out })
}
pub enum TxOp<'a> {
Insert {
entity: &'a str,
data: &'a serde_json::Value,
},
Update {
entity: &'a str,
id: &'a str,
data: &'a serde_json::Value,
},
Delete {
entity: &'a str,
id: &'a str,
},
}
#[derive(Debug, Clone)]
pub enum TxResult {
Inserted(String),
Updated(bool),
Deleted(bool),
}
impl LivePostgresAdapter {
pub fn transact(&mut self, ops: &[TxOp<'_>]) -> Result<Vec<TxResult>, StorageError> {
let mut tx = self.client.transaction().map_err(pg_err)?;
let mut results: Vec<TxResult> = Vec::with_capacity(ops.len());
for op in ops {
match op {
TxOp::Insert { entity, data } => {
let (sql, values) = build_insert_sql(entity, data)?;
let id = match &values[0] {
JsonParam::Text(s) => s.clone(),
_ => {
return Err(StorageError {
code: "PG_INTERNAL".into(),
message: "build_insert_sql produced non-text id param".into(),
});
}
};
let params = as_pg_params(&values);
tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
results.push(TxResult::Inserted(id));
}
TxOp::Update { entity, id, data } => {
let (sql, values) = build_update_sql(entity, id, data)?;
let params = as_pg_params(&values);
let n = tx.execute(sql.as_str(), ¶ms).map_err(pg_err)?;
results.push(TxResult::Updated(n > 0));
}
TxOp::Delete { entity, id } => {
let sql = format!("DELETE FROM {} WHERE id = $1", quote_ident(entity));
let n = tx.execute(sql.as_str(), &[id]).map_err(pg_err)?;
results.push(TxResult::Deleted(n > 0));
}
}
}
tx.commit().map_err(pg_err)?;
Ok(results)
}
}
fn value_to_pg(v: &serde_json::Value) -> JsonParam {
JsonParam::from_json(v)
}
pub(super) fn row_to_json(row: &postgres::Row) -> serde_json::Value {
use postgres::types::Type;
let mut obj = serde_json::Map::new();
for (i, col) in row.columns().iter().enumerate() {
let name = col.name().to_string();
let value: serde_json::Value = match *col.type_() {
Type::BOOL => try_get_or_null::<Option<bool>>(row, i)
.flatten()
.map(serde_json::Value::Bool)
.unwrap_or(serde_json::Value::Null),
Type::INT2 => try_get_or_null::<Option<i16>>(row, i)
.flatten()
.map(|v| serde_json::Value::Number(v.into()))
.unwrap_or(serde_json::Value::Null),
Type::INT4 => try_get_or_null::<Option<i32>>(row, i)
.flatten()
.map(|v| serde_json::Value::Number(v.into()))
.unwrap_or(serde_json::Value::Null),
Type::INT8 => try_get_or_null::<Option<i64>>(row, i)
.flatten()
.map(|v| serde_json::Value::Number(v.into()))
.unwrap_or(serde_json::Value::Null),
Type::FLOAT4 => try_get_or_null::<Option<f32>>(row, i)
.flatten()
.and_then(|v| serde_json::Number::from_f64(v as f64))
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Type::FLOAT8 => try_get_or_null::<Option<f64>>(row, i)
.flatten()
.and_then(serde_json::Number::from_f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
Type::JSON | Type::JSONB => try_get_or_null::<Option<serde_json::Value>>(row, i)
.flatten()
.unwrap_or(serde_json::Value::Null),
Type::BYTEA => try_get_or_null::<Option<Vec<u8>>>(row, i)
.flatten()
.map(|b| serde_json::Value::String(b64(&b)))
.unwrap_or(serde_json::Value::Null),
Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME | Type::UNKNOWN => {
try_get_or_null::<Option<String>>(row, i)
.flatten()
.map(serde_json::Value::String)
.unwrap_or(serde_json::Value::Null)
}
Type::TIMESTAMPTZ => {
try_get_or_null::<Option<chrono::DateTime<chrono::Utc>>>(row, i)
.flatten()
.map(|dt| {
serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
})
.unwrap_or(serde_json::Value::Null)
}
Type::TIMESTAMP => try_get_or_null::<Option<chrono::NaiveDateTime>>(row, i)
.flatten()
.map(|dt| {
serde_json::Value::String(dt.format("%Y-%m-%dT%H:%M:%SZ").to_string())
})
.unwrap_or(serde_json::Value::Null),
Type::DATE => try_get_or_null::<Option<chrono::NaiveDate>>(row, i)
.flatten()
.map(|d| serde_json::Value::String(d.format("%Y-%m-%d").to_string()))
.unwrap_or(serde_json::Value::Null),
_ => {
match row.try_get::<_, Option<String>>(i) {
Ok(Some(s)) => serde_json::Value::String(s),
Ok(None) => serde_json::Value::Null,
Err(_) => match row.try_get::<_, Option<Vec<u8>>>(i) {
Ok(Some(bytes)) => serde_json::Value::String(
String::from_utf8_lossy(&bytes).into_owned(),
),
_ => serde_json::Value::Null,
},
}
}
};
obj.insert(name, value);
}
serde_json::Value::Object(obj)
}
fn try_get_or_null<'a, T>(row: &'a postgres::Row, i: usize) -> Option<T>
where
T: postgres::types::FromSql<'a>,
{
match row.try_get::<_, T>(i) {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!(
"[postgres] decode failed for column {} ({}): {e}",
i,
row.columns()[i].name()
);
None
}
}
}
fn b64(bytes: &[u8]) -> String {
const TABLE: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
let chunks = bytes.chunks(3);
for chunk in chunks {
let b = [
chunk.first().copied().unwrap_or(0),
chunk.get(1).copied().unwrap_or(0),
chunk.get(2).copied().unwrap_or(0),
];
out.push(TABLE[(b[0] >> 2) as usize] as char);
out.push(TABLE[((b[0] & 0x03) << 4 | b[1] >> 4) as usize] as char);
if chunk.len() > 1 {
out.push(TABLE[((b[1] & 0x0F) << 2 | b[2] >> 6) as usize] as char);
} else {
out.push('=');
}
if chunk.len() > 2 {
out.push(TABLE[(b[2] & 0x3F) as usize] as char);
} else {
out.push('=');
}
}
out
}
fn pg_err(e: postgres::Error) -> StorageError {
use std::error::Error;
let mut detail = format!("{e}");
let mut src: Option<&dyn Error> = e.source();
while let Some(s) = src {
detail.push_str(": ");
detail.push_str(&format!("{s}"));
src = s.source();
}
StorageError {
code: "PG_QUERY_FAILED".into(),
message: format!("Postgres query failed: {detail}"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_manifest() -> AppManifest {
use pylon_kernel::{ManifestEntity, ManifestField, ManifestIndex};
let f = |name: &str, ty: &str, opt: bool, uniq: bool| ManifestField {
name: name.into(),
field_type: ty.into(),
optional: opt,
unique: uniq,
crdt: None,
};
AppManifest {
manifest_version: 1,
name: "test".into(),
version: "0.0.0".into(),
entities: vec![
ManifestEntity {
name: "User".into(),
fields: vec![
f("email", "string", false, true),
f("displayName", "string", false, false),
f("createdAt", "datetime", false, false),
],
indexes: vec![],
relations: vec![],
search: None,
crdt: true,
},
ManifestEntity {
name: "Todo".into(),
fields: vec![
f("title", "string", false, false),
f("done", "bool", false, false),
f("userId", "id(User)", false, false),
f("createdAt", "datetime", false, false),
],
indexes: vec![ManifestIndex {
name: "by_user".into(),
fields: vec!["userId".into()],
unique: false,
}],
relations: vec![],
search: None,
crdt: true,
},
],
queries: vec![],
actions: vec![],
policies: vec![],
routes: vec![],
auth: Default::default(),
}
}
#[test]
fn pg_type_mapping() {
assert_eq!(pg_column_type("string"), "TEXT");
assert_eq!(pg_column_type("int"), "INTEGER");
assert_eq!(pg_column_type("float"), "DOUBLE PRECISION");
assert_eq!(pg_column_type("bool"), "BOOLEAN");
assert_eq!(pg_column_type("datetime"), "TIMESTAMPTZ");
assert_eq!(pg_column_type("richtext"), "TEXT");
assert_eq!(pg_column_type("id(User)"), "TEXT");
}
#[test]
fn quote_ident_simple() {
assert_eq!(quote_ident("User"), "\"User\"");
assert_eq!(quote_ident("email"), "\"email\"");
}
#[test]
fn quote_ident_escapes_embedded_double_quotes() {
assert_eq!(quote_ident("col\"name"), "\"col\"\"name\"");
assert_eq!(quote_ident("a\"b\"c"), "\"a\"\"b\"\"c\"");
}
#[test]
fn create_table_sql_basic() {
let fields = vec![
FieldSpec {
name: "email".into(),
field_type: "string".into(),
optional: false,
unique: true,
},
FieldSpec {
name: "age".into(),
field_type: "int".into(),
optional: true,
unique: false,
},
];
let sql = create_table_sql("User", &fields);
assert_eq!(
sql,
"CREATE TABLE IF NOT EXISTS \"User\" (id TEXT PRIMARY KEY NOT NULL, \"email\" TEXT NOT NULL UNIQUE, \"age\" INTEGER)"
);
}
#[test]
fn create_table_sql_escapes_identifiers() {
let fields = vec![FieldSpec {
name: "col\"x".into(),
field_type: "string".into(),
optional: false,
unique: false,
}];
let sql = create_table_sql("my\"table", &fields);
assert!(sql.contains("\"my\"\"table\""));
assert!(sql.contains("\"col\"\"x\""));
}
#[test]
fn create_index_sql_unique() {
let sql = create_index_sql("User", "by_email", &["email".into()], true);
assert_eq!(
sql,
"CREATE UNIQUE INDEX IF NOT EXISTS \"User_by_email\" ON \"User\" (\"email\")"
);
}
#[test]
fn create_index_sql_non_unique() {
let sql = create_index_sql("Todo", "by_user", &["userId".into()], false);
assert_eq!(
sql,
"CREATE INDEX IF NOT EXISTS \"Todo_by_user\" ON \"Todo\" (\"userId\")"
);
}
#[test]
fn add_column_sql_basic() {
let field = FieldSpec {
name: "bio".into(),
field_type: "string".into(),
optional: true,
unique: false,
};
let sql = add_column_sql("User", &field);
assert_eq!(sql, "ALTER TABLE \"User\" ADD COLUMN \"bio\" TEXT");
}
#[test]
fn plan_from_manifest() {
let adapter = PostgresAdapter;
let manifest = test_manifest();
let plan = adapter.plan_schema(&manifest).unwrap();
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::CreateEntity { name, .. } if name == "User"
)));
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::CreateEntity { name, .. } if name == "Todo"
)));
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
)));
}
#[test]
fn plan_to_sql_produces_statements() {
let adapter = PostgresAdapter;
let manifest = test_manifest();
let plan = adapter.plan_schema(&manifest).unwrap();
let stmts = plan_to_sql(&plan).unwrap();
let create_tables = stmts
.iter()
.filter(|s| s.starts_with("CREATE TABLE"))
.count();
let create_indexes = stmts
.iter()
.filter(|s| s.starts_with("CREATE INDEX") || s.starts_with("CREATE UNIQUE INDEX"))
.count();
assert_eq!(create_tables, 2);
assert!(create_indexes >= 1);
assert!(stmts[0].starts_with("CREATE TABLE"));
assert!(stmts[1].starts_with("CREATE TABLE"));
}
#[test]
fn plan_to_sql_rejects_unsupported() {
let plan = SchemaPlan {
operations: vec![SchemaOperation::RemoveEntity {
name: "User".into(),
}],
};
let result = plan_to_sql(&plan);
assert!(result.is_err());
assert_eq!(result.unwrap_err().code, "PG_OP_UNSUPPORTED");
}
#[test]
fn apply_not_implemented() {
let adapter = PostgresAdapter;
let plan = SchemaPlan {
operations: vec![SchemaOperation::Noop],
};
let result = adapter.apply_schema(&plan);
assert!(result.is_err());
assert_eq!(result.unwrap_err().code, "APPLY_NOT_IMPLEMENTED");
}
#[test]
fn sql_uses_quoted_identifiers() {
let fields = vec![FieldSpec {
name: "createdAt".into(),
field_type: "datetime".into(),
optional: false,
unique: false,
}];
let sql = create_table_sql("User", &fields);
assert!(sql.contains("\"User\""));
assert!(sql.contains("\"createdAt\""));
assert!(sql.contains("TIMESTAMPTZ"));
}
#[test]
fn introspect_sql_constants_are_valid() {
assert!(INTROSPECT_TABLES_SQL.contains("information_schema.tables"));
assert!(INTROSPECT_COLUMNS_SQL.contains("$1"));
assert!(INTROSPECT_INDEXES_SQL.contains("$1"));
assert!(INTROSPECT_TABLES_SQL.contains("_pylon_"));
}
#[test]
fn plan_from_empty_snapshot_creates_all() {
let snapshot = crate::SchemaSnapshot { tables: vec![] };
let manifest = test_manifest();
let plan = plan_from_snapshot(&snapshot, &manifest);
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::CreateEntity { name, .. } if name == "User"
)));
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::CreateEntity { name, .. } if name == "Todo"
)));
assert!(plan.operations.iter().any(|op| matches!(
op,
SchemaOperation::AddIndex { entity, name, .. } if entity == "Todo" && name == "by_user"
)));
}
#[test]
fn plan_from_full_snapshot_is_noop() {
let snapshot = crate::SchemaSnapshot {
tables: vec![
crate::TableSnapshot {
name: "User".into(),
columns: vec![
crate::ColumnSnapshot {
name: "id".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: true,
},
crate::ColumnSnapshot {
name: "email".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "displayName".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "createdAt".into(),
column_type: "TIMESTAMPTZ".into(),
notnull: true,
primary_key: false,
},
],
indexes: vec![],
},
crate::TableSnapshot {
name: "Todo".into(),
columns: vec![
crate::ColumnSnapshot {
name: "id".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: true,
},
crate::ColumnSnapshot {
name: "title".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "done".into(),
column_type: "BOOLEAN".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "userId".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "createdAt".into(),
column_type: "TIMESTAMPTZ".into(),
notnull: true,
primary_key: false,
},
],
indexes: vec![crate::IndexSnapshot {
name: "Todo_by_user".into(),
columns: vec!["userId".into()],
unique: false,
}],
},
],
};
let manifest = test_manifest();
let plan = plan_from_snapshot(&snapshot, &manifest);
assert!(plan.is_empty());
}
#[test]
fn plan_detects_missing_column_in_snapshot() {
let snapshot = crate::SchemaSnapshot {
tables: vec![
crate::TableSnapshot {
name: "User".into(),
columns: vec![
crate::ColumnSnapshot {
name: "id".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: true,
},
crate::ColumnSnapshot {
name: "email".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
],
indexes: vec![],
},
crate::TableSnapshot {
name: "Todo".into(),
columns: vec![
crate::ColumnSnapshot {
name: "id".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: true,
},
crate::ColumnSnapshot {
name: "title".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "done".into(),
column_type: "BOOLEAN".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "userId".into(),
column_type: "TEXT".into(),
notnull: true,
primary_key: false,
},
crate::ColumnSnapshot {
name: "createdAt".into(),
column_type: "TIMESTAMPTZ".into(),
notnull: true,
primary_key: false,
},
],
indexes: vec![crate::IndexSnapshot {
name: "Todo_by_user".into(),
columns: vec!["userId".into()],
unique: false,
}],
},
],
};
let manifest = test_manifest();
let plan = plan_from_snapshot(&snapshot, &manifest);
let add_fields: Vec<_> = plan
.operations
.iter()
.filter(|op| matches!(op, SchemaOperation::AddField { .. }))
.collect();
assert_eq!(add_fields.len(), 2); }
#[test]
fn json_value_to_string_handles_all_types() {
assert_eq!(
json_value_to_string(&serde_json::Value::String("hello".into())),
"hello"
);
assert_eq!(json_value_to_string(&serde_json::json!(42)), "42");
assert_eq!(json_value_to_string(&serde_json::json!(1.5)), "1.5");
assert_eq!(json_value_to_string(&serde_json::Value::Bool(true)), "true");
assert_eq!(
json_value_to_string(&serde_json::Value::Bool(false)),
"false"
);
assert_eq!(json_value_to_string(&serde_json::Value::Null), "");
assert_eq!(
json_value_to_string(&serde_json::json!([1, 2, 3])),
"[1,2,3]"
);
assert_eq!(
json_value_to_string(&serde_json::json!({"a": 1})),
"{\"a\":1}"
);
}
#[test]
fn generate_id_returns_hex_string() {
let id = generate_id();
assert!(!id.is_empty());
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn generate_id_is_unique_across_calls() {
let id1 = generate_id();
let id2 = generate_id();
assert_ne!(id1, id2);
}
#[test]
fn generate_id_is_lex_sortable() {
let mut ids: Vec<String> = (0..1000).map(|_| generate_id()).collect();
let sorted = {
let mut s = ids.clone();
s.sort();
s
};
assert_eq!(ids, sorted, "generate_id must be lex-monotonic");
let len0 = ids[0].len();
assert!(ids.iter().all(|id| id.len() == len0));
ids.dedup();
assert_eq!(ids.len(), 1000, "no collisions in a tight loop");
}
#[test]
fn build_insert_sql_simple() {
let data = serde_json::json!({
"email": "alice@example.com",
"displayName": "Alice"
});
let (sql, values) = build_insert_sql("User", &data).unwrap();
assert!(sql.starts_with("INSERT INTO \"User\""));
assert!(sql.contains("id"));
assert!(sql.contains("$1"));
assert!(sql.contains("$2"));
assert!(sql.contains("$3"));
match &values[0] {
JsonParam::Text(s) => assert!(!s.is_empty()),
other => panic!("expected Text id param, got {other:?}"),
}
assert_eq!(values.len(), 3); }
#[test]
fn build_insert_sql_preserves_json_types() {
let data = serde_json::json!({
"n": 42,
"f": 1.5,
"b": true,
"s": "hi",
"z": null,
});
let (_sql, values) = build_insert_sql("T", &data).unwrap();
let kinds: Vec<&JsonParam> = values.iter().skip(1).collect();
assert!(matches!(kinds[0], JsonParam::Bool(true)));
assert!(matches!(kinds[1], JsonParam::Float(_)));
assert!(matches!(kinds[2], JsonParam::Int(42)));
assert!(matches!(kinds[3], JsonParam::Text(_)));
assert!(matches!(kinds[4], JsonParam::Null));
}
#[test]
fn build_insert_sql_quotes_column_names() {
let data = serde_json::json!({"createdAt": "2026-01-01"});
let (sql, _) = build_insert_sql("Todo", &data).unwrap();
assert!(sql.contains("\"createdAt\""));
assert!(sql.contains("\"Todo\""));
}
#[test]
fn build_insert_sql_rejects_non_object() {
let data = serde_json::json!("not an object");
let result = build_insert_sql("User", &data);
assert!(result.is_err());
assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
}
#[test]
fn build_update_sql_simple() {
let data = serde_json::json!({
"displayName": "Bob",
"email": "bob@example.com"
});
let (sql, values) = build_update_sql("User", "abc123", &data).unwrap();
assert!(sql.starts_with("UPDATE \"User\" SET"));
assert!(sql.contains("WHERE id = $1"));
assert!(sql.contains("$2"));
assert!(sql.contains("$3"));
match &values[0] {
JsonParam::Text(s) => assert_eq!(s, "abc123"),
other => panic!("expected Text id param, got {other:?}"),
}
assert_eq!(values.len(), 3); }
#[test]
fn build_update_sql_quotes_column_names() {
let data = serde_json::json!({"displayName": "Carol"});
let (sql, _) = build_update_sql("User", "id1", &data).unwrap();
assert!(sql.contains("\"displayName\" = $2"));
}
#[test]
fn build_update_sql_rejects_non_object() {
let data = serde_json::json!(42);
let result = build_update_sql("User", "id1", &data);
assert!(result.is_err());
assert_eq!(result.unwrap_err().code, "PG_INVALID_DATA");
}
#[test]
fn build_update_sql_rejects_empty_object() {
let data = serde_json::json!({});
let err = build_update_sql("User", "id1", &data).unwrap_err();
assert_eq!(err.code, "PG_INVALID_DATA");
assert!(err.message.contains("at least one field"));
}
}