use crate::core::{inventory, Model as _, ModelEntry, SqlValue};
use crate::sql::{Auto, ExecError, FetcherPool as _};
#[cfg(feature = "postgres")]
use crate::sql::{sqlx::PgPool, Fetcher as _};
use crate::Model;
#[derive(Debug, Clone, Model)]
#[rustango(table = "rustango_content_types")]
pub struct ContentType {
#[rustango(primary_key)]
pub id: Auto<i64>,
#[rustango(max_length = 100)]
pub app_label: String,
#[rustango(max_length = 100)]
pub model_name: String,
#[rustango(max_length = 100)]
pub table: String,
}
#[cfg(feature = "postgres")]
impl ContentType {
pub async fn for_model<T: crate::core::Model>(
pool: &PgPool,
) -> Result<Option<Self>, ExecError> {
let entry = inventory::iter::<ModelEntry>
.into_iter()
.find(|e| e.schema.table == T::SCHEMA.table)
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: T::SCHEMA.table,
})?;
let app = entry.resolved_app_label().unwrap_or("project");
let name = T::SCHEMA.name.to_ascii_lowercase();
Self::by_natural_key(pool, app, &name).await
}
pub async fn by_natural_key(
pool: &PgPool,
app_label: &str,
model_name: &str,
) -> Result<Option<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.filter(
"app_label",
crate::core::Op::Eq,
SqlValue::String(app_label.into()),
)
.filter(
"model_name",
crate::core::Op::Eq,
SqlValue::String(model_name.into()),
)
.limit(1)
.fetch(pool)
.await?;
Ok(rows.into_iter().next())
}
pub async fn by_id(pool: &PgPool, id: i64) -> Result<Option<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.filter("id", crate::core::Op::Eq, SqlValue::I64(id))
.limit(1)
.fetch(pool)
.await?;
Ok(rows.into_iter().next())
}
pub async fn all(pool: &PgPool) -> Result<Vec<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.order_by(&[("app_label", false), ("model_name", false)])
.fetch(pool)
.await?;
Ok(rows)
}
}
impl ContentType {
pub async fn by_natural_key_pool(
pool: &crate::sql::Pool,
app_label: &str,
model_name: &str,
) -> Result<Option<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.filter(
"app_label",
crate::core::Op::Eq,
SqlValue::String(app_label.into()),
)
.filter(
"model_name",
crate::core::Op::Eq,
SqlValue::String(model_name.into()),
)
.limit(1)
.fetch_pool(pool)
.await?;
Ok(rows.into_iter().next())
}
pub async fn by_id_pool(pool: &crate::sql::Pool, id: i64) -> Result<Option<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.filter("id", crate::core::Op::Eq, SqlValue::I64(id))
.limit(1)
.fetch_pool(pool)
.await?;
Ok(rows.into_iter().next())
}
pub async fn for_model_pool<T: crate::core::Model>(
pool: &crate::sql::Pool,
) -> Result<Option<Self>, ExecError> {
let entry = inventory::iter::<ModelEntry>
.into_iter()
.find(|e| e.schema.table == T::SCHEMA.table)
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: T::SCHEMA.table,
})?;
let app = entry.resolved_app_label().unwrap_or("project");
let name = T::SCHEMA.name.to_ascii_lowercase();
Self::by_natural_key_pool(pool, app, &name).await
}
pub async fn all_pool(pool: &crate::sql::Pool) -> Result<Vec<Self>, ExecError> {
let rows: Vec<Self> = Self::objects()
.order_by(&[("app_label", false), ("model_name", false)])
.fetch_pool(pool)
.await?;
Ok(rows)
}
}
#[cfg(feature = "postgres")]
pub async fn fetch_row_as_json(
pool: &PgPool,
ct: &ContentType,
pk: impl Into<SqlValue>,
) -> Result<Option<serde_json::Value>, ExecError> {
fetch_row_as_json_pool(&crate::sql::Pool::Postgres(pool.clone()), ct, pk).await
}
pub async fn fetch_row_as_json_pool(
pool: &crate::sql::Pool,
ct: &ContentType,
pk: impl Into<SqlValue>,
) -> Result<Option<serde_json::Value>, ExecError> {
use crate::core::{Filter, Op, SelectQuery, WhereExpr};
let entry = inventory::iter::<ModelEntry>
.into_iter()
.find(|e| e.schema.table == ct.table.as_str());
let Some(entry) = entry else {
return Ok(None);
};
let pk_field = entry
.schema
.primary_key()
.ok_or(ExecError::MissingPrimaryKey {
table: entry.schema.table,
})?;
let select_q = SelectQuery {
model: entry.schema,
where_clause: WhereExpr::Predicate(Filter {
column: pk_field.column,
op: Op::Eq,
value: pk.into(),
}),
search: None,
joins: vec![],
order_by: vec![],
limit: Some(1),
offset: None,
};
let fields: Vec<&'static crate::core::FieldSchema> = entry.schema.scalar_fields().collect();
crate::sql::select_one_row_as_json_pool(pool, &select_q, &fields).await
}
#[cfg(feature = "postgres")]
pub async fn for_each_row_of_ct<F>(
pool: &PgPool,
ct: &ContentType,
batch_size: u32,
f: F,
) -> Result<usize, ExecError>
where
F: FnMut(serde_json::Value) -> Result<(), ExecError>,
{
for_each_row_of_ct_pool(&crate::sql::Pool::Postgres(pool.clone()), ct, batch_size, f).await
}
pub async fn for_each_row_of_ct_pool<F>(
pool: &crate::sql::Pool,
ct: &ContentType,
batch_size: u32,
mut f: F,
) -> Result<usize, ExecError>
where
F: FnMut(serde_json::Value) -> Result<(), ExecError>,
{
use crate::core::{OrderClause, SelectQuery, WhereExpr};
let entry = inventory::iter::<ModelEntry>
.into_iter()
.find(|e| e.schema.table == ct.table.as_str());
let Some(entry) = entry else {
return Ok(0);
};
let pk_field = entry
.schema
.primary_key()
.ok_or(ExecError::MissingPrimaryKey {
table: entry.schema.table,
})?;
let batch = batch_size.max(1) as i64;
let fields: Vec<&'static crate::core::FieldSchema> = entry.schema.scalar_fields().collect();
let mut visited = 0_usize;
let mut offset = 0_i64;
loop {
let select_q = SelectQuery {
model: entry.schema,
where_clause: WhereExpr::And(vec![]),
search: None,
joins: vec![],
order_by: vec![OrderClause {
column: pk_field.column,
desc: false,
}],
limit: Some(batch),
offset: Some(offset),
};
let rows = crate::sql::select_rows_as_json_pool(pool, &select_q, &fields).await?;
if rows.is_empty() {
break;
}
let n = rows.len() as i64;
for json in rows {
f(json)?;
visited += 1;
}
if n < batch {
break;
}
offset += batch;
}
Ok(visited)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GenericForeignKey {
pub content_type_id: i64,
pub object_pk: i64,
}
impl GenericForeignKey {
#[must_use]
pub const fn new(content_type_id: i64, object_pk: i64) -> Self {
Self {
content_type_id,
object_pk,
}
}
#[cfg(feature = "postgres")]
pub async fn for_target<T: crate::core::Model>(
pool: &PgPool,
object_pk: i64,
) -> Result<Self, ExecError> {
Self::for_target_pool::<T>(&crate::sql::Pool::Postgres(pool.clone()), object_pk).await
}
pub async fn for_target_pool<T: crate::core::Model>(
pool: &crate::sql::Pool,
object_pk: i64,
) -> Result<Self, ExecError> {
let ct = ContentType::for_model_pool::<T>(pool)
.await?
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: T::SCHEMA.table,
})?;
let id = ct
.id
.get()
.copied()
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: ContentType::SCHEMA.table,
})?;
Ok(Self::new(id, object_pk))
}
}
#[cfg(feature = "postgres")]
pub async fn render_generic_fk_link(
pool: &PgPool,
gfk: GenericForeignKey,
) -> Result<String, ExecError> {
let escape = |s: &str| -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
};
let ct = match ContentType::by_id(pool, gfk.content_type_id).await? {
Some(c) => c,
None => {
return Ok(format!(
"<em>(ct={}, pk={})</em>",
gfk.content_type_id, gfk.object_pk
));
}
};
let label = format!("{}.{}", ct.app_label, ct.model_name);
let table_esc = escape(&ct.table);
let label_esc = escape(&label);
Ok(format!(
r#"<a href="/{table}/{pk}">{label} #{pk}</a>"#,
table = table_esc,
pk = gfk.object_pk,
label = label_esc,
))
}
pub async fn render_generic_fk_link_pool(
pool: &crate::sql::Pool,
gfk: GenericForeignKey,
) -> Result<String, ExecError> {
let escape = |s: &str| -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
};
let ct = match ContentType::by_id_pool(pool, gfk.content_type_id).await? {
Some(c) => c,
None => {
return Ok(format!(
"<em>(ct={}, pk={})</em>",
gfk.content_type_id, gfk.object_pk
));
}
};
let label = format!("{}.{}", ct.app_label, ct.model_name);
let table_esc = escape(&ct.table);
let label_esc = escape(&label);
Ok(format!(
r#"<a href="/{table}/{pk}">{label} #{pk}</a>"#,
table = table_esc,
pk = gfk.object_pk,
label = label_esc,
))
}
#[cfg(feature = "postgres")]
pub async fn prefetch_soft<C, F>(
pool: &PgPool,
parent_pks: &[i64],
target_fk_column: &'static str,
extract: F,
) -> Result<::std::collections::HashMap<i64, Vec<C>>, ExecError>
where
C: crate::core::Model
+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
+ Send
+ Unpin
+ 'static,
F: Fn(&C) -> i64,
{
if parent_pks.is_empty() {
return Ok(::std::collections::HashMap::new());
}
let mut keys: Vec<i64> = parent_pks.to_vec();
keys.sort_unstable();
keys.dedup();
let pk_values: Vec<crate::core::SqlValue> = keys
.iter()
.copied()
.map(crate::core::SqlValue::I64)
.collect();
let children: Vec<C> = crate::query::QuerySet::<C>::new()
.filter(
target_fk_column,
crate::core::Op::In,
crate::core::SqlValue::List(pk_values),
)
.fetch(pool)
.await?;
let mut grouped: ::std::collections::HashMap<i64, Vec<C>> = ::std::collections::HashMap::new();
for child in children {
let key = extract(&child);
grouped.entry(key).or_default().push(child);
}
Ok(grouped)
}
#[cfg(feature = "postgres")]
pub async fn prefetch_generic<C>(
pool: &PgPool,
pairs: &[(i64, i64)],
) -> Result<::std::collections::HashMap<(i64, i64), C>, ExecError>
where
C: crate::core::Model
+ for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>
+ crate::sql::HasPkValue
+ Send
+ Unpin
+ 'static,
{
if pairs.is_empty() {
return Ok(::std::collections::HashMap::new());
}
let target_ct =
ContentType::for_model::<C>(pool)
.await?
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: C::SCHEMA.table,
})?;
let target_ct_id = target_ct
.id
.get()
.copied()
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: ContentType::SCHEMA.table,
})?;
let mut wanted_pks: Vec<i64> = pairs
.iter()
.filter(|(ct, _)| *ct == target_ct_id)
.map(|(_, pk)| *pk)
.collect();
if wanted_pks.is_empty() {
return Ok(::std::collections::HashMap::new());
}
wanted_pks.sort_unstable();
wanted_pks.dedup();
let pk_values: Vec<crate::core::SqlValue> = wanted_pks
.iter()
.copied()
.map(crate::core::SqlValue::I64)
.collect();
let pk_field = C::SCHEMA
.primary_key()
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: C::SCHEMA.table,
})?;
let rows: Vec<C> = crate::query::QuerySet::<C>::new()
.filter(
pk_field.column,
crate::core::Op::In,
crate::core::SqlValue::List(pk_values),
)
.fetch(pool)
.await?;
let mut out: ::std::collections::HashMap<(i64, i64), C> =
::std::collections::HashMap::with_capacity(rows.len());
for row in rows {
let pk_value = <C as crate::sql::HasPkValue>::__rustango_pk_value_impl(&row);
if let crate::core::SqlValue::I64(pk) = pk_value {
out.insert((target_ct_id, pk), row);
}
}
Ok(out)
}
pub async fn prefetch_soft_pool<C, F>(
pool: &crate::sql::Pool,
parent_pks: &[i64],
target_fk_column: &'static str,
extract: F,
) -> Result<::std::collections::HashMap<i64, Vec<C>>, ExecError>
where
C: crate::core::Model
+ crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ crate::sql::LoadRelated
+ crate::sql::MaybeMyLoadRelated
+ crate::sql::MaybeSqliteLoadRelated
+ Send
+ Unpin
+ 'static,
F: Fn(&C) -> i64,
{
use crate::sql::FetcherPool as _;
if parent_pks.is_empty() {
return Ok(::std::collections::HashMap::new());
}
let mut keys: Vec<i64> = parent_pks.to_vec();
keys.sort_unstable();
keys.dedup();
let pk_values: Vec<crate::core::SqlValue> = keys
.iter()
.copied()
.map(crate::core::SqlValue::I64)
.collect();
let children: Vec<C> = crate::query::QuerySet::<C>::new()
.filter(
target_fk_column,
crate::core::Op::In,
crate::core::SqlValue::List(pk_values),
)
.fetch_pool(pool)
.await?;
let mut grouped: ::std::collections::HashMap<i64, Vec<C>> = ::std::collections::HashMap::new();
for child in children {
let key = extract(&child);
grouped.entry(key).or_default().push(child);
}
Ok(grouped)
}
pub async fn prefetch_generic_pool<C>(
pool: &crate::sql::Pool,
pairs: &[(i64, i64)],
) -> Result<::std::collections::HashMap<(i64, i64), C>, ExecError>
where
C: crate::core::Model
+ crate::sql::MaybePgFromRow
+ crate::sql::MaybeMyFromRow
+ crate::sql::MaybeSqliteFromRow
+ crate::sql::LoadRelated
+ crate::sql::MaybeMyLoadRelated
+ crate::sql::MaybeSqliteLoadRelated
+ crate::sql::HasPkValue
+ Send
+ Unpin
+ 'static,
{
use crate::sql::FetcherPool as _;
if pairs.is_empty() {
return Ok(::std::collections::HashMap::new());
}
let target_ct = ContentType::for_model_pool::<C>(pool)
.await?
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: C::SCHEMA.table,
})?;
let target_ct_id = target_ct
.id
.get()
.copied()
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: ContentType::SCHEMA.table,
})?;
let mut wanted_pks: Vec<i64> = pairs
.iter()
.filter(|(ct, _)| *ct == target_ct_id)
.map(|(_, pk)| *pk)
.collect();
if wanted_pks.is_empty() {
return Ok(::std::collections::HashMap::new());
}
wanted_pks.sort_unstable();
wanted_pks.dedup();
let pk_values: Vec<crate::core::SqlValue> = wanted_pks
.iter()
.copied()
.map(crate::core::SqlValue::I64)
.collect();
let pk_field = C::SCHEMA
.primary_key()
.ok_or_else(|| ExecError::MissingPrimaryKey {
table: C::SCHEMA.table,
})?;
let rows: Vec<C> = crate::query::QuerySet::<C>::new()
.filter(
pk_field.column,
crate::core::Op::In,
crate::core::SqlValue::List(pk_values),
)
.fetch_pool(pool)
.await?;
let mut out: ::std::collections::HashMap<(i64, i64), C> =
::std::collections::HashMap::with_capacity(rows.len());
for row in rows {
let pk_value = <C as crate::sql::HasPkValue>::__rustango_pk_value_impl(&row);
if let crate::core::SqlValue::I64(pk) = pk_value {
out.insert((target_ct_id, pk), row);
}
}
Ok(out)
}
const CREATE_TABLE_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS "rustango_content_types" (
"id" BIGSERIAL PRIMARY KEY,
"app_label" VARCHAR(100) NOT NULL,
"model_name" VARCHAR(100) NOT NULL,
"table" VARCHAR(100) NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS "rustango_content_types_natural_key"
ON "rustango_content_types" ("app_label", "model_name");
"#;
#[cfg(feature = "postgres")]
pub async fn ensure_table(pool: &PgPool) -> Result<(), crate::sql::sqlx::Error> {
for stmt in CREATE_TABLE_SQL.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() {
continue;
}
crate::sql::sqlx::query(trimmed).execute(pool).await?;
}
Ok(())
}
#[cfg(feature = "postgres")]
pub async fn ensure_seeded(pool: &PgPool) -> Result<usize, ExecError> {
ensure_table(pool).await.map_err(ExecError::Driver)?;
let mut inserted = 0_usize;
for entry in inventory::iter::<ModelEntry> {
let table = entry.schema.table;
if table == ContentType::SCHEMA.table {
continue;
}
let app = entry.resolved_app_label().unwrap_or("project").to_owned();
let name = entry.schema.name.to_ascii_lowercase();
if ContentType::by_natural_key(pool, &app, &name)
.await?
.is_some()
{
continue;
}
let mut row = ContentType {
id: Auto::Unset,
app_label: app,
model_name: name,
table: table.to_owned(),
};
row.insert(pool).await?;
inserted += 1;
}
Ok(inserted)
}
pub async fn ensure_table_pool(pool: &crate::sql::Pool) -> Result<(), crate::sql::sqlx::Error> {
use crate::core::Model as _;
let dialect = pool.dialect();
let table_ddl = crate::migrate::ddl::create_table_if_not_exists_sql_with_dialect(
dialect,
&ContentType::SCHEMA,
);
exec_one(pool, &table_ddl).await?;
let table_q = dialect.quote_ident("rustango_content_types");
let idx_q = dialect.quote_ident("rustango_content_types_natural_key");
let col_app = dialect.quote_ident("app_label");
let col_name = dialect.quote_ident("model_name");
let supports_if_not_exists = matches!(dialect.name(), "postgres" | "sqlite");
let index_ddl = if supports_if_not_exists {
format!("CREATE UNIQUE INDEX IF NOT EXISTS {idx_q} ON {table_q} ({col_app}, {col_name})")
} else {
format!("CREATE UNIQUE INDEX {idx_q} ON {table_q} ({col_app}, {col_name})")
};
match exec_one(pool, &index_ddl).await {
Ok(()) => Ok(()),
Err(e) if dialect.name() == "mysql" && is_mysql_dup_index_error(&e) => Ok(()),
Err(e) => Err(e),
}
}
async fn exec_one(pool: &crate::sql::Pool, sql: &str) -> Result<(), crate::sql::sqlx::Error> {
match pool {
#[cfg(feature = "postgres")]
crate::sql::Pool::Postgres(pg) => {
crate::sql::sqlx::query(sql).execute(pg).await?;
}
#[cfg(feature = "mysql")]
crate::sql::Pool::Mysql(my) => {
crate::sql::sqlx::query(sql).execute(my).await?;
}
#[cfg(feature = "sqlite")]
crate::sql::Pool::Sqlite(sq) => {
crate::sql::sqlx::query(sql).execute(sq).await?;
}
}
Ok(())
}
#[cfg(feature = "mysql")]
fn is_mysql_dup_index_error(e: &crate::sql::sqlx::Error) -> bool {
if let crate::sql::sqlx::Error::Database(db) = e {
return db.code().as_deref() == Some("42000")
|| db.message().contains("Duplicate key name");
}
false
}
#[cfg(not(feature = "mysql"))]
#[allow(dead_code)]
fn is_mysql_dup_index_error(_e: &crate::sql::sqlx::Error) -> bool {
false
}
pub async fn ensure_seeded_pool(pool: &crate::sql::Pool) -> Result<usize, ExecError> {
ensure_table_pool(pool).await.map_err(ExecError::Driver)?;
let mut inserted = 0_usize;
for entry in inventory::iter::<ModelEntry> {
let table = entry.schema.table;
if table == ContentType::SCHEMA.table {
continue;
}
let app = entry.resolved_app_label().unwrap_or("project").to_owned();
let name = entry.schema.name.to_ascii_lowercase();
if ContentType::by_natural_key_pool(pool, &app, &name)
.await?
.is_some()
{
continue;
}
let mut row = ContentType {
id: Auto::Unset,
app_label: app,
model_name: name,
table: table.to_owned(),
};
row.insert_pool(pool).await?;
inserted += 1;
}
Ok(inserted)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn content_type_schema_has_expected_columns() {
let s = ContentType::SCHEMA;
assert_eq!(s.table, "rustango_content_types");
let cols: Vec<&str> = s.fields.iter().map(|f| f.column).collect();
assert!(cols.contains(&"id"));
assert!(cols.contains(&"app_label"));
assert!(cols.contains(&"model_name"));
assert!(cols.contains(&"table"));
}
#[test]
fn content_type_id_is_auto() {
let pk = ContentType::SCHEMA
.primary_key()
.expect("ContentType has a PK");
assert_eq!(pk.column, "id");
assert!(pk.auto, "ContentType.id should be Auto<i64>");
}
}