"""{{ model_name }} model and related types."""
from __future__ import annotations
from dataclasses import asdict, is_dataclass
from pydantic import BaseModel, Field
from typing import TYPE_CHECKING, Any, Dict, List, Optional, ClassVar
{%- if needs_typeddict %}
from typing import TypedDict, NotRequired, Literal, Union
from datetime import datetime
from uuid import UUID
from decimal import Decimal
{%- else %}
{%- if has_datetime %}
from datetime import datetime
{%- endif %}
{%- if has_uuid %}
from uuid import UUID
{%- endif %}
{%- if has_decimal %}
from decimal import Decimal
{%- endif %}
{%- endif %}
from enum import Enum
from .._internal.descriptors import classproperty
{% if has_enums %}
from ..enums.enums import {% for e in enum_imports %}{{ e }}{% if not loop.last %}, {% endif %}{% endfor %}
{%- endif %}
{%- if has_composite_types %}
from ..types.types import {% for t in composite_type_imports %}{{ t }}{% if not loop.last %}, {% endif %}{% endfor %}
{%- endif %}
if TYPE_CHECKING:
{%- if has_relations %}
from . import {% for r in relation_imports %}{{ r }}{% if not loop.last %}, {% endif %}{% endfor %}
{%- endif %}
{%- if has_includes %}
{%- for field in include_fields %}
from .{{ field.target_snake }} import (
{{ field.target_model }}WhereInput,
{{ field.target_model }}OrderByInput,
{{ field.target_model }}ScalarFieldKeys,
{%- for depth in range(end=recursive_type_depth) %}
{{ field.target_model }}IncludeRecursive{{ depth + 1 }},
{%- endfor %}
)
{%- endfor %}
{%- endif %}
from .._internal.client import NautilusClient
class {{ model_name }}(BaseModel):
"""{{ model_name }} model mapped to '{{ table_name }}' table."""
{%- for f in scalar_fields %}
{{ f.name }}: {{ f.model_python_type }}{% if f.model_has_default %} = {{ f.model_default }}{% endif %}
{%- endfor %}
{%- for f in relation_fields %}
{{ f.name }}: {{ f.python_type }} = {{ f.default }}
{%- endfor %}
nautilus: ClassVar["{{ delegate_name }}"]
if TYPE_CHECKING:
# Type hint for IDE autocomplete - at runtime this is a classproperty descriptor
nautilus: "{{ delegate_name }}"
else:
@classproperty
def nautilus(cls) -> "{{ delegate_name }}":
"""Access the auto-registered client delegate for {{ model_name }}.
This property allows calling model operations directly on the class:
users = await User.nautilus.find_many(where={"role": "ADMIN"}, take=10)
user = await User.nautilus.create({"email": "alice@example.com"})
Requires a Nautilus instance with auto_register=True:
async with Nautilus(auto_register=True):
await User.nautilus.find_many()
Returns:
The delegate instance for this model.
Raises:
RuntimeError: If no auto-registered Nautilus instance exists.
"""
from .._internal.client import NautilusClient
client = NautilusClient.get_global_instance()
if client is None:
raise RuntimeError(
f"No Nautilus instance with auto_register=True found. "
f"Create one with: Nautilus(auto_register=True)"
)
return client.get_delegate("{{ snake_name }}")
# Engine response key mapping: "table__db_column" -> Python field name
_{{ table_name }}_row_map: Dict[str, str] = {
{%- for f in scalar_fields %}
"{{ table_name }}__{{ f.db_name }}": "{{ f.name }}",
{%- endfor %}
}
_{{ table_name }}_fields: frozenset = frozenset({
{%- for f in scalar_fields %}
"{{ f.name }}",
{%- endfor %}
})
def _coerce_boolean_value(value: Any) -> Any:
if isinstance(value, bool):
return value
if isinstance(value, int) and value in (0, 1):
return bool(value)
if isinstance(value, str):
lowered = value.lower()
if lowered in ("true", "1"):
return True
if lowered in ("false", "0"):
return False
return value
_{{ table_name }}_scalar_coercers: Dict[str, Any] = {
{%- for f in scalar_fields %}
{%- if f.is_enum %}
"{{ f.name }}": {{ f.base_type }},
{%- elif f.base_type == "bool" %}
"{{ f.name }}": _coerce_boolean_value,
{%- elif f.base_type == "datetime" %}
"{{ f.name }}": lambda v: datetime.fromisoformat(v.replace(" ", "T")) if isinstance(v, str) else v,
{%- endif %}
{%- endfor %}
}
# Python field name -> DB column name mapping (handles @map renames)
_{{ table_name }}_py_to_db: Dict[str, str] = {
{%- for f in scalar_fields %}
"{{ f.logical_name }}": "{{ f.db_name }}",
{%- if f.name != f.logical_name %}
"{{ f.name }}": "{{ f.db_name }}",
{%- endif %}
{%- endfor %}
}
_{{ table_name }}_py_to_logical: Dict[str, str] = {
{%- for f in scalar_fields %}
"{{ f.logical_name }}": "{{ f.logical_name }}",
{%- if f.name != f.logical_name %}
"{{ f.name }}": "{{ f.logical_name }}",
{%- endif %}
{%- endfor %}
}
_MISSING = object()
{% include "input_types.py.tera" %}
def _serialize_wire_value(value: Any) -> Any:
"""Convert generated client values into JSON-RPC-safe payloads."""
if isinstance(value, Enum):
return value.value
if is_dataclass(value):
return {k: _serialize_wire_value(v) for k, v in asdict(value).items()}
if isinstance(value, list):
return [_serialize_wire_value(item) for item in value]
if isinstance(value, dict):
return {k: _serialize_wire_value(v) for k, v in value.items()}
return value
def _process_create_data(data: Dict[str, Any], py_to_db: Dict[str, str]) -> Dict[str, Any]:
"""Convert CreateInput/UpdateInput dict to DB column names."""
result = {}
for key, value in data.items():
db_key = py_to_db.get(key, key)
result[db_key] = _serialize_wire_value(value)
return result
def _process_where_filters(where: Dict[str, Any], py_to_db: Dict[str, str]) -> Dict[str, Any]:
"""Convert WhereInput format to internal filter format."""
result = {}
for key, value in where.items():
if key in ('AND', 'OR', 'NOT'):
if isinstance(value, list):
result[key] = [_process_where_filters(item, py_to_db) for item in value]
else:
result[key] = _process_where_filters(value, py_to_db)
else:
db_key = py_to_db.get(key, key)
if isinstance(value, dict):
# Filter object with operators – keep nested {field: {op: value}} format
# so the engine's parse_field_condition receives {"id": {"in": [...]}}.
ops = {}
for op, op_value in value.items():
# Convert Python-safe names back (in_ -> in, not_ -> not)
actual_op = op.rstrip('_') if op.endswith('_') and op[:-1] in ('in', 'not') else op
coerced = _serialize_wire_value(op_value)
if actual_op == 'equals':
# 'equals' shorthand maps to direct equality
result[db_key] = coerced
else:
ops[actual_op] = coerced
if ops:
result[db_key] = ops
else:
result[db_key] = _serialize_wire_value(value)
return result
def _process_select_fields(select: Dict[str, bool], py_to_logical: Dict[str, str]) -> Dict[str, bool]:
"""Convert Python field names to logical field names for query projection."""
result = {}
for key, value in select.items():
logical_key = py_to_logical.get(key, key)
result[logical_key] = value
return result
def _serialize_include(include: Any) -> Any:
"""Serialize include dict to the engine wire format.
Converts Python snake_case keys (``order_by``) to camelCase (``orderBy``)
and recursively processes nested includes.
"""
if include is None or isinstance(include, bool):
return include
if not isinstance(include, dict):
return include
result: Dict[str, Any] = {}
for field, spec in include.items():
if isinstance(spec, bool):
result[field] = spec
elif isinstance(spec, dict):
node: Dict[str, Any] = {}
for k, v in spec.items():
if k == "order_by":
if isinstance(v, list):
node["orderBy"] = v
elif isinstance(v, dict):
node["orderBy"] = [{fk: fv} for fk, fv in v.items()]
else:
node["orderBy"] = v
elif k == "include":
node["include"] = _serialize_include(v)
else:
node[k] = v
result[field] = node
else:
result[field] = spec
return result
def _get_wire_value(row: Dict[str, Any], *keys: str) -> Any:
for key in keys:
if key in row:
return row[key]
return _MISSING
def _coerce_{{ snake_name }}_scalar(py_key: str, value: Any) -> Any:
if py_key in _{{ table_name }}_scalar_coercers:
return _{{ table_name }}_scalar_coercers[py_key](value)
return value
def _hydrate_{{ snake_name }}_relation(field_name: str, value: Any) -> Any:
{%- for field in include_fields %}
if field_name == "{{ field.name }}":
from .{{ field.target_snake }} import _{{ field.target_snake }}_from_wire
{%- if field.is_array %}
if value is None:
return []
if isinstance(value, list):
return [
_{{ field.target_snake }}_from_wire(item)
for item in value
if isinstance(item, dict)
]
return []
{%- else %}
if value is None:
return None
if isinstance(value, dict):
return _{{ field.target_snake }}_from_wire(value)
return None
{%- endif %}
{%- endfor %}
return value
def _{{ snake_name }}_from_wire(row: Dict[str, Any]) -> "{{ model_name }}":
"""Convert engine rows and nested include payloads to {{ model_name }}."""
kwargs: Dict[str, Any] = {}
{%- for f in scalar_fields %}
value = _get_wire_value(row, "{{ table_name }}__{{ f.db_name }}", "{{ f.logical_name }}")
if value is not _MISSING:
if value is not None:
kwargs["{{ f.name }}"] = _coerce_{{ snake_name }}_scalar("{{ f.name }}", value)
{%- endfor %}
{%- for field in include_fields %}
relation_value = _get_wire_value(row, "{{ field.logical_name }}_json")
if relation_value is not _MISSING:
kwargs["{{ field.name }}"] = _hydrate_{{ snake_name }}_relation(
"{{ field.name }}",
relation_value,
)
{%- endfor %}
return {{ model_name }}(**kwargs)
def _{{ table_name }}_from_row(row: Dict[str, Any]) -> "{{ model_name }}":
"""Convert engine row (table__column keys) to {{ model_name }} dataclass."""
return _{{ snake_name }}_from_wire(row)
def _{{ snake_name }}_from_row(row: Dict[str, Any]) -> "{{ model_name }}":
"""Compatibility wrapper keyed by model snake_case."""
return _{{ snake_name }}_from_wire(row)
class {{ delegate_name }}:
"""Operations for {{ model_name }}.
{%- if is_async %}
Every method executes immediately when awaited — no extra ``.exec()`` call
is needed::
users = await client.{{ table_name }}.find_many(where={"role": "ADMIN"}, take=10)
user = await client.{{ table_name }}.create({"email": "alice@example.com"})
{%- else %}
Every method executes synchronously — no ``await`` or event loop needed::
users = client.{{ table_name }}.find_many(where={"role": "ADMIN"}, take=10)
user = client.{{ table_name }}.create({"email": "alice@example.com"})
{%- endif %}
"""
def __init__(self, client: NautilusClient) -> None:
self._client = client
{% if is_async %}async {% endif %}def find_many(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
order_by: Optional[{{ model_name }}OrderByInput] = None,
take: Optional[int] = None,
skip: Optional[int] = None,
select: Optional[{{ model_name }}SelectInput] = None,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
cursor: Optional[Dict[str, Any]] = None,
distinct: Optional[List[str]] = None,
chunk_size: Optional[int] = None,
) -> List[{{ model_name }}]:
"""Find all {{ model_name }} records matching the given filters.
Args:
where: Filter conditions (optional).
order_by: Sorting directions, e.g. ``{"created_at": "desc"}``.
take: Maximum number of rows to return (LIMIT).
Positive values paginate **forward**; negative values
paginate **backward** from the cursor position.
skip: Number of rows to skip (OFFSET).
select: Scalar field projection, e.g. ``{"id": True, "display_name": True}``.
Primary-key fields are always returned by the engine.
include: Relations to eager-load, e.g. ``{"posts": True}``.
cursor: Stable keyset cursor — a dict of primary-key field name
to value. All PK fields of the model must be present.
The cursor record is included in the result (inclusive).
distinct: List of field names to deduplicate on.
Postgres: ``SELECT DISTINCT ON (col, ...)`` with the
distinct columns auto-prepended to ORDER BY.
SQLite / MySQL: plain ``SELECT DISTINCT`` (full-row
dedup — most effective combined with ``select``).
chunk_size: Optional protocol-level row chunk size for large result
sets. The client still returns one fully merged list.
Returns:
A list of matching ``{{ model_name }}`` instances.
"""
args: Dict[str, Any] = {}
if where is not None:
args["where"] = _process_where_filters(where, _{{ table_name }}_py_to_db)
if order_by is not None:
if isinstance(order_by, list):
args["orderBy"] = order_by
else:
args["orderBy"] = [{field: direction} for field, direction in order_by.items()]
if take is not None:
args["take"] = take
if skip is not None:
args["skip"] = skip
if select is not None:
args["select"] = _process_select_fields(select, _{{ table_name }}_py_to_logical)
if include is not None:
args["include"] = _serialize_include(include)
if cursor is not None:
args["cursor"] = cursor
if distinct is not None:
args["distinct"] = distinct
payload: Dict[str, Any] = {
"protocolVersion": 1,
"model": "{{ model_name }}",
"args": args,
}
if chunk_size is not None:
payload["chunkSize"] = chunk_size
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.findMany", payload)
return [_{{ table_name }}_from_row(row) for row in result.get("data", [])]
{% if is_async %}async {% endif %}def find_first(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
order_by: Optional[{{ model_name }}OrderByInput] = None,
select: Optional[{{ model_name }}SelectInput] = None,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
) -> Optional[{{ model_name }}]:
"""Find the first {{ model_name }} record matching the given filters.
Returns:
The first matching ``{{ model_name }}`` instance, or ``None``.
"""
rows = {% if is_async %}await {% endif %}self.find_many(where=where, order_by=order_by, take=1, select=select, include=include)
return rows[0] if rows else None
{% if is_async %}async {% endif %}def find_unique(
self,
*,
where: {{ model_name }}WhereInput,
select: Optional[{{ model_name }}SelectInput] = None,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
) -> Optional[{{ model_name }}]:
"""Find a single {{ model_name }} record by a unique filter.
Returns:
The matching ``{{ model_name }}`` instance, or ``None``.
"""
rows = {% if is_async %}await {% endif %}self.find_many(where=where, take=1, select=select, include=include)
return rows[0] if rows else None
{% if is_async %}async {% endif %}def find_unique_or_throw(
self,
*,
where: {{ model_name }}WhereInput,
select: Optional[{{ model_name }}SelectInput] = None,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
) -> {{ model_name }}:
"""Find a single {{ model_name }} record by a unique filter, or raise if not found.
Returns:
The matching ``{{ model_name }}`` instance.
Raises:
NotFoundError: If no record matches the given filter.
"""
from ..errors import NotFoundError
record = {% if is_async %}await {% endif %}self.find_unique(where=where, select=select, include=include)
if record is None:
raise NotFoundError(
f"findUniqueOrThrow: no {{ model_name }} record found matching the given filter"
)
return record
{% if is_async %}async {% endif %}def find_first_or_throw(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
order_by: Optional[{{ model_name }}OrderByInput] = None,
select: Optional[{{ model_name }}SelectInput] = None,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
) -> {{ model_name }}:
"""Find the first {{ model_name }} record matching ``where``, or raise if not found.
Returns:
The first matching ``{{ model_name }}`` instance.
Raises:
NotFoundError: If no record matches the given filters.
"""
from ..errors import NotFoundError
record = {% if is_async %}await {% endif %}self.find_first(where=where, order_by=order_by, select=select, include=include)
if record is None:
raise NotFoundError(
f"findFirstOrThrow: no {{ model_name }} record found matching the given filter"
)
return record
{% if is_async %}async {% endif %}def create(self, data: {{ model_name }}CreateInput, *, return_data: bool = True) -> Optional[{{ model_name }}]:
"""Create a new {{ model_name }} record.
Args:
data: Field values for the new record.
return_data: When ``True`` (default) the created record is returned.
Set to ``False`` for fire-and-forget inserts to avoid the
round-trip overhead of ``RETURNING``.
Returns:
The newly created ``{{ model_name }}`` instance, or ``None`` when
``return_data=False``.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.create", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"data": _process_create_data(data, _{{ table_name }}_py_to_db),
"returnData": return_data,
})
if not return_data:
return None
data_rows = result.get("data", [])
if not data_rows:
raise RuntimeError("Create operation returned no data")
return _{{ table_name }}_from_row(data_rows[0])
{% if is_async %}async {% endif %}def create_many(
self,
data: List[{{ model_name }}CreateInput],
*,
return_data: bool = True,
) -> List[{{ model_name }}]:
"""Create multiple {{ model_name }} records in a single batch.
Args:
data: List of field-value dicts for the records to create.
return_data: When ``True`` (default) the created records are returned.
Set to ``False`` for bulk inserts to skip ``RETURNING`` overhead.
Returns:
The list of newly created ``{{ model_name }}`` instances, or an empty
list when ``return_data=False``.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.createMany", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"data": [_process_create_data(_entry, _{{ table_name }}_py_to_db) for _entry in data],
"returnData": return_data,
})
if not return_data:
return []
return [_{{ table_name }}_from_row(row) for row in result.get("data", [])]
{% if is_async %}async {% endif %}def update(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
data: {{ model_name }}UpdateInput,
return_data: bool = True,
):
"""Update {{ model_name }} records matching ``where``.
Args:
where: Filter conditions. If ``None``, all records are updated.
data: Fields to update.
return_data: When ``True`` (default) returns the updated records.
Set to ``False`` for bulk updates to skip ``RETURNING`` overhead;
the affected-row count is returned instead.
Returns:
``List[{{ model_name }}]`` when ``return_data=True``, or ``int``
(affected-row count) when ``return_data=False``.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.update", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"filter": _process_where_filters(where or {}, _{{ table_name }}_py_to_db),
"data": _process_create_data(data, _{{ table_name }}_py_to_db),
"returnData": return_data,
})
if not return_data:
return result.get("count", 0)
return [_{{ table_name }}_from_row(row) for row in result.get("data", [])]
{% if is_async %}async {% endif %}def delete(
self,
*,
where: {{ model_name }}WhereInput,
return_data: bool = True,
) -> Optional["{{ model_name }}"]:
"""Delete the first {{ model_name }} record matching ``where``.
Finds the first matching record and deletes it by its primary key.
Args:
where: Filter conditions. Required.
return_data: When ``True`` (default) returns the deleted record.
Set to ``False`` to skip ``RETURNING`` overhead; ``None`` is
returned in that case.
Returns:
The deleted ``{{ model_name }}`` instance when ``return_data=True``,
or ``None`` when ``return_data=False``.
Raises:
NotFoundError: If no record matches the given filter.
"""
from ..errors import NotFoundError
record = {% if is_async %}await {% endif %}self.find_first(where=where)
if record is None:
raise NotFoundError(
f"delete: no {{ model_name }} record found matching the given filter"
)
pk_filter = {
{%- for pk in primary_key_fields %}
"{{ pk }}": getattr(record, "{{ pk }}"),
{%- endfor %}
}
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.delete", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"filter": _process_where_filters(pk_filter, _{{ table_name }}_py_to_db),
"returnData": return_data,
})
if return_data:
data_rows = result.get("data", [])
if data_rows:
return _{{ table_name }}_from_row(data_rows[0])
return record
return None
{% if is_async %}async {% endif %}def delete_many(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
return_data: bool = False,
):
"""Delete all {{ model_name }} records matching ``where``.
Args:
where: Filter conditions. If ``None``, all records are deleted.
return_data: When ``True`` returns the deleted records (uses
``RETURNING``). Defaults to ``False`` for efficiency.
Returns:
``int`` (affected-row count) when ``return_data=False`` (default),
or ``List[{{ model_name }}]`` when ``return_data=True``.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.delete", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"filter": _process_where_filters(where or {}, _{{ table_name }}_py_to_db),
"returnData": return_data,
})
if return_data:
return [_{{ table_name }}_from_row(row) for row in result.get("data", [])]
return result.get("count", 0)
{% if is_async %}async {% endif %}def count(
self,
*,
where: Optional[{{ model_name }}WhereInput] = None,
take: Optional[int] = None,
skip: Optional[int] = None,
cursor: Optional[Dict[str, Any]] = None,
) -> int:
"""Count the number of {{ model_name }} records matching the given filters.
Args:
where: Filter conditions (optional).
take: Limit the count to a forward-pagination window of at most this
many rows. When combined with ``skip``, counts only the rows
in that slice (mirrors ``findMany`` semantics).
skip: Number of rows to skip before counting.
cursor: Keyset cursor — combined with ``take``/``skip`` to count
within a paginated window.
Returns:
The integer count of matching records.
"""
args: Dict[str, Any] = {}
if where is not None:
args["where"] = _process_where_filters(where, _{{ table_name }}_py_to_db)
if take is not None:
args["take"] = take
if skip is not None:
args["skip"] = skip
if cursor is not None:
args["cursor"] = cursor
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.count", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"args": args if args else None,
})
return result.get("count", 0)
{% if is_async %}async {% endif %}def group_by(
self,
by: List[{{ model_name }}ScalarFieldKeys],
*,
where: Optional[{{ model_name }}WhereInput] = None,
having: Optional[Dict[str, Any]] = None,
take: Optional[int] = None,
skip: Optional[int] = None,
count: Optional[Union[bool, {{ model_name }}CountAggregateInput]] = None,
{% if has_numeric_fields %}
avg: Optional[{{ model_name }}AvgAggregateInput] = None,
sum: Optional[{{ model_name }}SumAggregateInput] = None,
{% endif %}
min: Optional[{{ model_name }}MinAggregateInput] = None,
max: Optional[{{ model_name }}MaxAggregateInput] = None,
order: Optional[Union[
Dict[{{ model_name }}ScalarFieldKeys, SortOrder],
List[Dict[{{ model_name }}ScalarFieldKeys, SortOrder]],
]] = None,
) -> List[{{ model_name }}GroupByOutput]:
"""Group {{ model_name }} records and compute aggregates.
Args:
by: List of field names to group by. Required.
where: Pre-group filter (applied before grouping).
having: Post-group filter (applied after grouping), e.g.
``{"_count": {"_all": {"gt": 5}}}``.
take: Maximum number of groups to return.
skip: Number of groups to skip.
count: Compute COUNT. Pass ``True`` for ``COUNT(*)`` or a
``{{ model_name }}CountAggregateInput`` dict to count specific fields.
{% if has_numeric_fields %}
avg: Compute AVG for the specified numeric fields.
sum: Compute SUM for the specified numeric fields.
{% endif %}
min: Compute MIN for the specified fields.
max: Compute MAX for the specified fields.
order: Sort order — a mapping or list of mappings from field name
to ``"asc"``/``"desc"``. Aggregate keys like ``_count``,
``_sum``, etc. are also accepted.
Returns:
A list of ``{{ model_name }}GroupByOutput`` dicts, one per group.
"""
args: Dict[str, Any] = {"by": list(by)}
if where is not None:
args["where"] = _process_where_filters(where, _{{ table_name }}_py_to_db)
if having is not None:
args["having"] = having
if take is not None:
args["take"] = take
if skip is not None:
args["skip"] = skip
if count is not None:
args["count"] = count
{% if has_numeric_fields %}
if avg is not None:
args["avg"] = avg
if sum is not None:
args["sum"] = sum
{% endif %}
if min is not None:
args["min"] = min
if max is not None:
args["max"] = max
if order is not None:
if isinstance(order, list):
args["orderBy"] = order
else:
args["orderBy"] = [{field: direction} for field, direction in order.items()]
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.groupBy", {
"protocolVersion": 1,
"model": "{{ model_name }}",
"args": args,
})
return result.get("data", [])
{% if is_async %}async {% endif %}def raw_query(
self,
sql: str,
) -> List[Dict[str, Any]]:
"""Execute a raw SQL string and return the result rows as dicts.
The SQL is sent to the database as-is with no parameter binding.
Prefer :meth:`raw_stmt_query` when user-supplied values are involved to
avoid SQL injection.
Args:
sql: Raw SQL string to execute.
Returns:
A list of dicts mapping column name to value for each result row.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.rawQuery", {
"protocolVersion": 1,
"sql": sql,
})
return result.get("data", [])
{% if is_async %}async {% endif %}def raw_stmt_query(
self,
sql: str,
params: Optional[List[Any]] = None,
) -> List[Dict[str, Any]]:
"""Execute a raw prepared-statement query with bound parameters.
Use ``$1``, ``$2``, … (PostgreSQL) or ``?`` (MySQL / SQLite) as
placeholders. Parameters are bound in the order they appear in *params*.
Args:
sql: SQL string containing parameter placeholders.
params: Ordered list of values to bind to the placeholders.
Returns:
A list of dicts mapping column name to value for each result row.
"""
result = {% if is_async %}await self._client._rpc({% else %}self._client._sync_rpc({% endif %}"query.rawStmtQuery", {
"protocolVersion": 1,
"sql": sql,
"params": params or [],
})
return result.get("data", [])
{% if is_async %}async {% endif %}def upsert(
self,
*,
where: {{ model_name }}WhereInput,
data: {{ model_name }}UpsertInput,
include: Optional[{% if has_includes %}{{ model_name }}IncludeInput{% else %}Dict[str, Any]{% endif %}] = None,
return_data: bool = True,
) -> Optional[{{ model_name }}]:
"""Create or update a {{ model_name }} record.
Looks up an existing record using ``where``. If found, applies
``data["update"]``; otherwise creates a new record with ``data["create"]``.
Args:
where: Filter to locate the existing record (should target a unique field).
data: Dict with ``create`` and ``update`` keys, each holding the
respective field values.
include: Relations to eager-load on the returned record.
return_data: When ``True`` (default) returns the record.
Set to ``False`` to skip ``RETURNING`` overhead; ``None`` is
returned in that case.
Returns:
The created or updated ``{{ model_name }}`` instance, or ``None`` when
``return_data=False``.
"""
create_data: {{ model_name }}CreateInput = data.get("create", {}) # type: ignore[assignment]
update_data: {{ model_name }}UpdateInput = data.get("update", {}) # type: ignore[assignment]
existing = {% if is_async %}await {% endif %}self.find_first(where=where)
if existing is not None:
result = {% if is_async %}await {% endif %}self.update(where=where, data=update_data, return_data=return_data)
if not return_data:
return None
if isinstance(result, list):
record = result[0] if result else existing
else:
record = existing
if include:
rows = {% if is_async %}await {% endif %}self.find_many(where=where, take=1, include=include)
return rows[0] if rows else record
return record
created = {% if is_async %}await {% endif %}self.create(create_data, return_data=return_data)
if not return_data:
return None
if created is None:
raise RuntimeError("upsert: create operation returned no data")
if include:
rows = {% if is_async %}await {% endif %}self.find_many(where=where, take=1, include=include)
return rows[0] if rows else created
return created
__all__ = [
"{{ model_name }}",
"{{ delegate_name }}",
{%- if needs_typeddict %}
"{{ model_name }}WhereInput",
"{{ model_name }}CreateInput",
"{{ model_name }}UpdateInput",
"{{ model_name }}UpsertInput",
"{{ model_name }}OrderByInput",
"{{ model_name }}SelectInput",
"{{ model_name }}ScalarFieldKeys",
"{{ model_name }}CountAggregateInput",
{%- if has_numeric_fields %}
"{{ model_name }}AvgAggregateInput",
"{{ model_name }}SumAggregateInput",
{%- endif %}
"{{ model_name }}MinAggregateInput",
"{{ model_name }}MaxAggregateInput",
"{{ model_name }}GroupByOutput",
"SortOrder",
{%- if has_includes %}
"{{ model_name }}IncludeInput",
{%- for field in include_fields %}
"FindMany{{ field.target_model }}ArgsFrom{{ model_name }}",
{%- endfor %}
{%- for depth in range(end=recursive_type_depth) %}
"{{ model_name }}IncludeRecursive{{ depth + 1 }}",
{%- for field in include_fields %}
"FindMany{{ field.target_model }}ArgsFrom{{ model_name }}Recursive{{ depth + 1 }}",
{%- endfor %}
{%- endfor %}
{%- endif %}
"StringFilter",
"IntFilter",
"FloatFilter",
"DecimalFilter",
"DateTimeFilter",
"UuidFilter",
"BoolFilter",
{%- endif %}
]