/// Structured input for creating a {{ model_name }} record.
///
/// All fields are optional; unset fields are omitted from the INSERT.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}CreateInput {
{%- for field in create_fields %}
/// {{ field.name }} column.
pub {{ field.name }}: Option<{{ field.rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}CreateInput {
fn to_engine_data(&self) -> serde_json::Value {
let mut data = serde_json::Map::new();
{%- for field in create_fields %}
if let Some(value) = &self.{{ field.name }} {
data.insert(
"{{ field.db_name }}".to_string(),
{%- if field.is_optional %}
match value {
Some(inner) => nautilus_core::Value::from(inner.clone()).to_json_plain(),
None => serde_json::Value::Null,
},
{%- else %}
nautilus_core::Value::from(value.clone()).to_json_plain(),
{%- endif %}
);
}
{%- endfor %}
serde_json::Value::Object(data)
}
}
/// Structured input for updating {{ model_name }} records.
///
/// All fields are optional; only set fields are included in the UPDATE.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}UpdateInput {
{%- for field in create_fields %}
/// {{ field.name }} column.
pub {{ field.name }}: Option<{{ field.rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}UpdateInput {
fn to_engine_data(&self) -> serde_json::Value {
let mut data = serde_json::Map::new();
{%- for field in create_fields %}
if let Some(value) = &self.{{ field.name }} {
data.insert(
"{{ field.db_name }}".to_string(),
{%- if field.is_optional %}
match value {
Some(inner) => nautilus_core::Value::from(inner.clone()).to_json_plain(),
None => serde_json::Value::Null,
},
{%- else %}
nautilus_core::Value::from(value.clone()).to_json_plain(),
{%- endif %}
);
}
{%- endfor %}
serde_json::Value::Object(data)
}
}
/// Structured arguments for updating {{ model_name }} records.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}UpdateArgs {
/// Optional WHERE filter — if `None`, all rows are updated.
pub where_: Option<nautilus_core::Expr>,
/// Fields to update.
pub data: {{ model_name }}UpdateInput,
}
/// Structured arguments for deleting {{ model_name }} records.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}DeleteArgs {
/// Optional WHERE filter — if `None`, all rows are deleted.
pub where_: Option<nautilus_core::Expr>,
}
/// Structured arguments for upserting a {{ model_name }} record.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}UpsertArgs {
/// WHERE filter used to look up the existing record (should target a unique field).
pub where_: Option<nautilus_core::Expr>,
/// Fields used when creating a new record (record not found).
pub create: {{ model_name }}CreateInput,
/// Fields applied when updating an existing record.
pub update: {{ model_name }}UpdateInput,
/// Whether to return the created/updated record (default: true).
pub return_data: bool,
}
/// Scalar fields on {{ model_name }} that can be referenced in generated aggregate APIs.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum {{ model_name }}ScalarField {
{%- for field in scalar_fields %}
{{ field.variant_name }},
{%- endfor %}
}
impl {{ model_name }}ScalarField {
fn as_str(&self) -> &'static str {
match self {
{%- for field in scalar_fields %}
Self::{{ field.variant_name }} => "{{ field.logical_name }}",
{%- endfor %}
}
}
}
/// Sort direction used by {{ model_name }} `group_by()` ordering.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum {{ model_name }}SortOrder {
Asc,
Desc,
}
impl {{ model_name }}SortOrder {
fn as_str(&self) -> &'static str {
match self {
Self::Asc => "asc",
Self::Desc => "desc",
}
}
}
/// Structured arguments for counting {{ model_name }} records.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}CountArgs {
/// Optional WHERE filter.
pub where_: Option<nautilus_core::Expr>,
/// Limit the counted window.
pub take: Option<i32>,
/// Offset the counted window.
pub skip: Option<u32>,
/// Optional keyset cursor for paginated counts.
pub cursor: Option<std::collections::HashMap<String, nautilus_core::Value>>,
}
impl {{ model_name }}CountArgs {
fn to_engine_args(&self) -> nautilus_core::Result<Option<serde_json::Value>> {
let mut args = serde_json::Map::new();
if let Some(filter) = self.where_.as_ref() {
args.insert(
"where".to_string(),
nautilus_core::where_expr_to_protocol_json(filter)?,
);
}
if let Some(take) = self.take {
args.insert("take".to_string(), serde_json::Value::from(take));
}
if let Some(skip) = self.skip {
args.insert("skip".to_string(), serde_json::Value::from(skip));
}
if let Some(cursor) = self.cursor.as_ref() {
args.insert("cursor".to_string(), serialize_cursor_for_engine(cursor));
}
Ok((!args.is_empty()).then_some(serde_json::Value::Object(args)))
}
}
/// Select which COUNT aggregates to return from `{{ delegate_name }}::group_by`.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}CountAggregateInput {
pub _all: bool,
{%- for field in scalar_fields %}
pub {{ field.name }}: bool,
{%- endfor %}
}
impl {{ model_name }}CountAggregateInput {
fn to_json(&self) -> Option<serde_json::Value> {
let mut obj = serde_json::Map::new();
if self._all {
obj.insert("_all".to_string(), serde_json::Value::Bool(true));
}
{%- for field in scalar_fields %}
if self.{{ field.name }} {
obj.insert("{{ field.logical_name }}".to_string(), serde_json::Value::Bool(true));
}
{%- endfor %}
(!obj.is_empty()).then_some(serde_json::Value::Object(obj))
}
}
{%- if has_numeric_fields %}
/// Select which AVG aggregates to return from `{{ delegate_name }}::group_by`.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}AvgAggregateInput {
{%- for field in numeric_fields %}
pub {{ field.name }}: bool,
{%- endfor %}
}
impl {{ model_name }}AvgAggregateInput {
fn to_json(&self) -> Option<serde_json::Value> {
let mut obj = serde_json::Map::new();
{%- for field in numeric_fields %}
if self.{{ field.name }} {
obj.insert("{{ field.logical_name }}".to_string(), serde_json::Value::Bool(true));
}
{%- endfor %}
(!obj.is_empty()).then_some(serde_json::Value::Object(obj))
}
}
/// Select which SUM aggregates to return from `{{ delegate_name }}::group_by`.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}SumAggregateInput {
{%- for field in numeric_fields %}
pub {{ field.name }}: bool,
{%- endfor %}
}
impl {{ model_name }}SumAggregateInput {
fn to_json(&self) -> Option<serde_json::Value> {
let mut obj = serde_json::Map::new();
{%- for field in numeric_fields %}
if self.{{ field.name }} {
obj.insert("{{ field.logical_name }}".to_string(), serde_json::Value::Bool(true));
}
{%- endfor %}
(!obj.is_empty()).then_some(serde_json::Value::Object(obj))
}
}
{%- endif %}
/// Select which MIN aggregates to return from `{{ delegate_name }}::group_by`.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}MinAggregateInput {
{%- for field in orderable_fields %}
pub {{ field.name }}: bool,
{%- endfor %}
}
impl {{ model_name }}MinAggregateInput {
fn to_json(&self) -> Option<serde_json::Value> {
let mut obj = serde_json::Map::new();
{%- for field in orderable_fields %}
if self.{{ field.name }} {
obj.insert("{{ field.logical_name }}".to_string(), serde_json::Value::Bool(true));
}
{%- endfor %}
(!obj.is_empty()).then_some(serde_json::Value::Object(obj))
}
}
/// Select which MAX aggregates to return from `{{ delegate_name }}::group_by`.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}MaxAggregateInput {
{%- for field in orderable_fields %}
pub {{ field.name }}: bool,
{%- endfor %}
}
impl {{ model_name }}MaxAggregateInput {
fn to_json(&self) -> Option<serde_json::Value> {
let mut obj = serde_json::Map::new();
{%- for field in orderable_fields %}
if self.{{ field.name }} {
obj.insert("{{ field.logical_name }}".to_string(), serde_json::Value::Bool(true));
}
{%- endfor %}
(!obj.is_empty()).then_some(serde_json::Value::Object(obj))
}
}
/// One ORDER BY item for `{{ delegate_name }}::group_by()`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum {{ model_name }}GroupByOrderBy {
Field {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
CountAll({{ model_name }}SortOrder),
Count {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
{%- if has_numeric_fields %}
Avg {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
Sum {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
{%- endif %}
{%- if has_orderable_fields %}
Min {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
Max {
field: {{ model_name }}ScalarField,
direction: {{ model_name }}SortOrder,
},
{%- endif %}
}
impl {{ model_name }}GroupByOrderBy {
fn to_json(&self) -> serde_json::Value {
match self {
Self::Field { field, direction } => {
let mut obj = serde_json::Map::new();
obj.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
serde_json::Value::Object(obj)
}
Self::CountAll(direction) => {
let mut inner = serde_json::Map::new();
inner.insert(
"_all".to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_count".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
Self::Count { field, direction } => {
let mut inner = serde_json::Map::new();
inner.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_count".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
{%- if has_numeric_fields %}
Self::Avg { field, direction } => {
let mut inner = serde_json::Map::new();
inner.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_avg".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
Self::Sum { field, direction } => {
let mut inner = serde_json::Map::new();
inner.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_sum".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
{%- endif %}
{%- if has_orderable_fields %}
Self::Min { field, direction } => {
let mut inner = serde_json::Map::new();
inner.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_min".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
Self::Max { field, direction } => {
let mut inner = serde_json::Map::new();
inner.insert(
field.as_str().to_string(),
serde_json::Value::String(direction.as_str().to_string()),
);
let mut outer = serde_json::Map::new();
outer.insert("_max".to_string(), serde_json::Value::Object(inner));
serde_json::Value::Object(outer)
}
{%- endif %}
}
}
}
/// Structured arguments for grouping {{ model_name }} rows and computing aggregates.
#[derive(Debug, Default, Clone)]
pub struct {{ model_name }}GroupByArgs {
/// Logical field names to group by.
pub by: Vec<{{ model_name }}ScalarField>,
/// Optional pre-group WHERE filter.
pub where_: Option<nautilus_core::Expr>,
/// Optional HAVING JSON object using the protocol aggregate shape.
pub having: Option<serde_json::Value>,
/// Limit the number of returned groups.
pub take: Option<i32>,
/// Offset the returned groups.
pub skip: Option<u32>,
/// COUNT aggregation selection.
pub count: Option<{{ model_name }}CountAggregateInput>,
{%- if has_numeric_fields %}
/// AVG aggregation selection.
pub avg: Option<{{ model_name }}AvgAggregateInput>,
/// SUM aggregation selection.
pub sum: Option<{{ model_name }}SumAggregateInput>,
{%- endif %}
/// MIN aggregation selection.
pub min: Option<{{ model_name }}MinAggregateInput>,
/// MAX aggregation selection.
pub max: Option<{{ model_name }}MaxAggregateInput>,
/// ORDER BY items applied after grouping.
pub order_by: Vec<{{ model_name }}GroupByOrderBy>,
}
impl {{ model_name }}GroupByArgs {
fn to_engine_args(&self) -> nautilus_core::Result<serde_json::Value> {
let mut args = serde_json::Map::new();
args.insert(
"by".to_string(),
serde_json::Value::Array(
self.by
.iter()
.map(|field| serde_json::Value::String(field.as_str().to_string()))
.collect(),
),
);
if let Some(filter) = self.where_.as_ref() {
args.insert(
"where".to_string(),
nautilus_core::where_expr_to_protocol_json(filter)?,
);
}
if let Some(having) = self.having.as_ref() {
args.insert("having".to_string(), having.clone());
}
if let Some(take) = self.take {
args.insert("take".to_string(), serde_json::Value::from(take));
}
if let Some(skip) = self.skip {
args.insert("skip".to_string(), serde_json::Value::from(skip));
}
if let Some(count) = self.count.as_ref().and_then({{ model_name }}CountAggregateInput::to_json) {
args.insert("count".to_string(), count);
}
{%- if has_numeric_fields %}
if let Some(avg) = self.avg.as_ref().and_then({{ model_name }}AvgAggregateInput::to_json) {
args.insert("avg".to_string(), avg);
}
if let Some(sum) = self.sum.as_ref().and_then({{ model_name }}SumAggregateInput::to_json) {
args.insert("sum".to_string(), sum);
}
{%- endif %}
if let Some(min) = self.min.as_ref().and_then({{ model_name }}MinAggregateInput::to_json) {
args.insert("min".to_string(), min);
}
if let Some(max) = self.max.as_ref().and_then({{ model_name }}MaxAggregateInput::to_json) {
args.insert("max".to_string(), max);
}
if !self.order_by.is_empty() {
args.insert(
"orderBy".to_string(),
serde_json::Value::Array(
self.order_by
.iter()
.map({{ model_name }}GroupByOrderBy::to_json)
.collect(),
),
);
}
Ok(serde_json::Value::Object(args))
}
}
/// Typed COUNT aggregates returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}CountAggregateOutput {
pub _all: Option<i64>,
{%- for field in scalar_fields %}
pub {{ field.name }}: Option<i64>,
{%- endfor %}
}
impl {{ model_name }}CountAggregateOutput {
fn from_json_object(
obj: &serde_json::Map<String, serde_json::Value>,
) -> nautilus_core::Result<Self> {
Ok(Self {
_all: decode_optional_group_by_json_value(obj, "_all")?,
{%- for field in scalar_fields %}
{{ field.name }}: decode_optional_group_by_json_value(obj, "{{ field.logical_name }}")?,
{%- endfor %}
})
}
}
{%- if has_numeric_fields %}
/// Typed AVG aggregates returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}AvgAggregateOutput {
{%- for field in numeric_fields %}
pub {{ field.name }}: Option<{{ field.avg_rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}AvgAggregateOutput {
fn from_json_object(
obj: &serde_json::Map<String, serde_json::Value>,
) -> nautilus_core::Result<Self> {
Ok(Self {
{%- for field in numeric_fields %}
{{ field.name }}: decode_optional_group_by_json_value(obj, "{{ field.logical_name }}")?,
{%- endfor %}
})
}
}
/// Typed SUM aggregates returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}SumAggregateOutput {
{%- for field in numeric_fields %}
pub {{ field.name }}: Option<{{ field.sum_rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}SumAggregateOutput {
fn from_json_object(
obj: &serde_json::Map<String, serde_json::Value>,
) -> nautilus_core::Result<Self> {
Ok(Self {
{%- for field in numeric_fields %}
{{ field.name }}: decode_optional_group_by_json_value(obj, "{{ field.logical_name }}")?,
{%- endfor %}
})
}
}
{%- endif %}
/// Typed MIN aggregates returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}MinAggregateOutput {
{%- for field in orderable_fields %}
pub {{ field.name }}: Option<{{ field.rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}MinAggregateOutput {
fn from_json_object(
obj: &serde_json::Map<String, serde_json::Value>,
) -> nautilus_core::Result<Self> {
Ok(Self {
{%- for field in orderable_fields %}
{{ field.name }}: decode_optional_group_by_json_value(obj, "{{ field.logical_name }}")?,
{%- endfor %}
})
}
}
/// Typed MAX aggregates returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}MaxAggregateOutput {
{%- for field in orderable_fields %}
pub {{ field.name }}: Option<{{ field.rust_type }}>,
{%- endfor %}
}
impl {{ model_name }}MaxAggregateOutput {
fn from_json_object(
obj: &serde_json::Map<String, serde_json::Value>,
) -> nautilus_core::Result<Self> {
Ok(Self {
{%- for field in orderable_fields %}
{{ field.name }}: decode_optional_group_by_json_value(obj, "{{ field.logical_name }}")?,
{%- endfor %}
})
}
}
/// One typed row returned by `{{ delegate_name }}::group_by()`.
#[derive(Debug, Default, Clone, PartialEq)]
pub struct {{ model_name }}GroupByOutput {
{%- for field in scalar_fields %}
pub {{ field.name }}: Option<{{ field.base_rust_type }}>,
{%- endfor %}
pub _count: Option<{{ model_name }}CountAggregateOutput>,
{%- if has_numeric_fields %}
pub _avg: Option<{{ model_name }}AvgAggregateOutput>,
pub _sum: Option<{{ model_name }}SumAggregateOutput>,
{%- endif %}
pub _min: Option<{{ model_name }}MinAggregateOutput>,
pub _max: Option<{{ model_name }}MaxAggregateOutput>,
}
impl {{ model_name }}GroupByOutput {
fn from_row(row: &crate::Row) -> nautilus_core::Result<Self> {
Ok(Self {
{%- for field in scalar_fields %}
{{ field.name }}: decode_optional_group_by_row_value(row, "{{ field.logical_name }}")?,
{%- endfor %}
_count: decode_optional_group_by_object(row, "_count")?
.map({{ model_name }}CountAggregateOutput::from_json_object)
.transpose()?,
{%- if has_numeric_fields %}
_avg: decode_optional_group_by_object(row, "_avg")?
.map({{ model_name }}AvgAggregateOutput::from_json_object)
.transpose()?,
_sum: decode_optional_group_by_object(row, "_sum")?
.map({{ model_name }}SumAggregateOutput::from_json_object)
.transpose()?,
{%- endif %}
_min: decode_optional_group_by_object(row, "_min")?
.map({{ model_name }}MinAggregateOutput::from_json_object)
.transpose()?,
_max: decode_optional_group_by_object(row, "_max")?
.map({{ model_name }}MaxAggregateOutput::from_json_object)
.transpose()?,
})
}
}
fn serialize_cursor_for_engine(
cursor: &std::collections::HashMap<String, nautilus_core::Value>,
) -> serde_json::Value {
serde_json::Value::Object(
cursor
.iter()
.map(|(key, value)| (key.clone(), value.to_json_plain()))
.collect(),
)
}
fn decode_optional_group_by_row_value<T>(
row: &crate::Row,
key: &str,
) -> nautilus_core::Result<Option<T>>
where
T: nautilus_core::FromValue,
{
match row.get(key) {
Some(nautilus_core::Value::Null) | None => Ok(None),
Some(value) => <T as nautilus_core::FromValue>::from_value(value).map(Some),
}
}
fn decode_optional_group_by_object<'a>(
row: &'a crate::Row,
key: &str,
) -> nautilus_core::Result<Option<&'a serde_json::Map<String, serde_json::Value>>> {
match row.get(key) {
Some(nautilus_core::Value::Json(serde_json::Value::Object(obj))) => Ok(Some(obj)),
Some(nautilus_core::Value::Null) | None => Ok(None),
Some(other) => Err(nautilus_core::Error::TypeError(format!(
"expected object-valued aggregate '{}' in groupBy output, got {:?}",
key, other
))),
}
}
fn decode_optional_group_by_json_value<T>(
obj: &serde_json::Map<String, serde_json::Value>,
key: &str,
) -> nautilus_core::Result<Option<T>>
where
T: nautilus_core::FromValue,
{
match obj.get(key) {
Some(serde_json::Value::Null) | None => Ok(None),
Some(value) => <T as nautilus_core::FromValue>::from_value_owned(
crate::runtime::wire_value_to_core_value(key, value),
)
.map(Some),
}
}
fn serialize_update_filter_for_engine(
filter: Option<&nautilus_core::Expr>,
) -> nautilus_core::Result<Option<serde_json::Value>> {
let filter_json = match filter {
Some(expr) => match nautilus_core::where_expr_to_protocol_json(expr) {
Ok(value) => value,
Err(nautilus_core::Error::InvalidQuery(_)) => return Ok(None),
Err(err) => return Err(err),
},
None => return Ok(None),
};
Ok(Some(filter_json))
}
/// Delegate for {{ model_name }} queries.
///
/// Obtained via `client.{{ table_name }}` (or the model accessor). Each method
/// accepts a structured args object and executes the query immediately — no
/// extra `.exec()` call is needed.
pub struct {{ delegate_name }}<E: crate::Executor> {
client: crate::Client<E>,
}
impl<E> {{ delegate_name }}<E>
where
E: crate::Executor,
for<'a> E::Row<'a>: Into<crate::Row>,
{
pub(crate) fn new(client: crate::Client<E>) -> Self {
{{ delegate_name }} { client }
}
/// Find all {{ model_name }} records matching `args`.
///
/// Executes immediately and returns a `Vec<{{ model_name }}>`.
pub {% if is_async %}async {% endif %}fn find_many(
&self,
args: nautilus_core::FindManyArgs,
) -> nautilus_core::Result<Vec<{{ model_name }}>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
if let Some(rows) =
crate::runtime::try_find_many_via_engine::<_, {{ model_name }}>(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
&args,
)
.await?
{
return Ok(rows);
}
if !args.include.is_empty() {
return Err(nautilus_core::Error::InvalidQuery(
"include queries require the embedded engine path in the generated Rust client"
.to_string(),
));
}
let mut builder = {{ find_many_name }}::new({% if is_async %}self.client.clone(){% else %}_c{% endif %});
if let Some(filter) = args.where_ {
builder = builder.where_(filter);
}
for order in args.order_by {
builder = builder.order_by(order);
}
if let Some(take) = args.take {
builder = builder.take(take);
}
if let Some(skip) = args.skip {
builder = builder.skip(skip);
}
if !args.select.is_empty() {
let set: std::collections::HashSet<String> = args.select
.into_iter()
.filter_map(|(k, v)| if v { Some(k) } else { None })
.collect();
if !set.is_empty() {
builder = builder.select_fields(set);
}
}
if let Some(cursor) = args.cursor {
builder = builder.cursor(cursor);
}
if !args.distinct.is_empty() {
builder = builder.distinct(args.distinct);
}
builder.exec().await
{%- if not is_async %}
}))
{%- endif %}
}
/// Find the first {{ model_name }} record matching `args`, or `None`.
pub {% if is_async %}async {% endif %}fn find_first(
&self,
args: nautilus_core::FindManyArgs,
) -> nautilus_core::Result<Option<{{ model_name }}>> {
let args = nautilus_core::FindManyArgs { take: Some(1), ..args };
Ok(self.find_many(args){% if is_async %}.await{% endif %}?.into_iter().next())
}
/// Find a single {{ model_name }} record by a unique filter, or `None` if not found.
///
/// `args.where_` should reference a `@id` or `@unique` field for correct semantics.
pub {% if is_async %}async {% endif %}fn find_unique(
&self,
args: nautilus_core::FindUniqueArgs,
) -> nautilus_core::Result<Option<{{ model_name }}>> {
let find_args = nautilus_core::FindManyArgs {
where_: Some(args.where_),
take: Some(1),
select: args.select,
include: args.include,
..Default::default()
};
Ok(self.find_many(find_args){% if is_async %}.await{% endif %}?.into_iter().next())
}
/// Find a single {{ model_name }} record by a unique filter, or return `Error::NotFound`.
pub {% if is_async %}async {% endif %}fn find_unique_or_throw(
&self,
args: nautilus_core::FindUniqueArgs,
) -> nautilus_core::Result<{{ model_name }}> {
self.find_unique(args){% if is_async %}.await{% endif %}?.ok_or_else(|| {
nautilus_core::Error::NotFound(
format!("No {} record found matching the given unique filter", "{{ model_name }}")
)
})
}
/// Find the first {{ model_name }} record matching `args`, or return `Error::NotFound`.
pub {% if is_async %}async {% endif %}fn find_first_or_throw(
&self,
args: nautilus_core::FindManyArgs,
) -> nautilus_core::Result<{{ model_name }}> {
self.find_first(args){% if is_async %}.await{% endif %}?.ok_or_else(|| {
nautilus_core::Error::NotFound(
format!("No {} record found matching the given filter", "{{ model_name }}")
)
})
}
/// Create a new {{ model_name }} record.
///
/// Executes immediately and returns the created `{{ model_name }}`.
pub {% if is_async %}async {% endif %}fn create(
&self,
data: {{ model_name }}CreateInput,
) -> nautilus_core::Result<{{ model_name }}> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
if let Some(record) =
crate::runtime::try_create_via_engine::<_, {{ model_name }}>(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
data.to_engine_data(),
)
.await?
{
return Ok(record);
}
let mut builder = {{ create_name }}::new({% if is_async %}self.client.clone(){% else %}_c{% endif %});
{%- for field in create_fields %}
if let Some(value) = data.{{ field.name }} {
builder = builder.{{ field.name }}(value);
}
{%- endfor %}
builder.exec().await
{%- if not is_async %}
}))
{%- endif %}
}
/// Create multiple {{ model_name }} records in a single batch.
///
/// Executes immediately and returns the created records.
pub {% if is_async %}async {% endif %}fn create_many(
&self,
entries: Vec<{{ entry_name }}>,
) -> nautilus_core::Result<Vec<{{ model_name }}>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
if entries.is_empty() {
return Ok(vec![]);
}
if let Some(rows) =
crate::runtime::try_create_many_via_engine::<_, {{ model_name }}>(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
entries.iter().map({{ entry_name }}::to_engine_data).collect(),
)
.await?
{
return Ok(rows);
}
let mut builder = {{ create_many_name }}::new({% if is_async %}self.client.clone(){% else %}_c{% endif %});
for entry in entries {
builder = builder.entry(entry);
}
builder.exec().await
{%- if not is_async %}
}))
{%- endif %}
}
/// Update {{ model_name }} records matching `args.where_`.
///
/// If `args.where_` is `None`, all rows are updated.
/// Executes immediately and returns the updated records.
pub {% if is_async %}async {% endif %}fn update(
&self,
args: {{ model_name }}UpdateArgs,
) -> nautilus_core::Result<Vec<{{ model_name }}>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
let engine_data = args.data.to_engine_data();
let no_assignments = engine_data
.as_object()
.is_some_and(|data| data.is_empty());
if no_assignments {
return Ok(vec![]);
}
if let Some(filter_json) = serialize_update_filter_for_engine(args.where_.as_ref())? {
if let Some(rows) =
crate::runtime::try_update_via_engine::<_, {{ model_name }}>(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
filter_json,
engine_data,
)
.await?
{
return Ok(rows);
}
}
let mut builder = {{ update_name }}::new({% if is_async %}self.client.clone(){% else %}_c{% endif %}).set_data(args.data);
if let Some(filter) = args.where_ {
builder = builder.where_(filter);
} else {
builder = builder.all();
}
builder.exec().await
{%- if not is_async %}
}))
{%- endif %}
}
/// Delete the first {{ model_name }} record matching `args.where_`.
///
/// finds the first matching record and
/// deletes it by its primary key. Returns `Ok(None)` if no record matches.
pub {% if is_async %}async {% endif %}fn delete(
&self,
args: {{ model_name }}DeleteArgs,
) -> nautilus_core::Result<Option<{{ model_name }}>> {
// Find the first matching record to get its primary key
let record = self.find_first(nautilus_core::FindManyArgs {
where_: args.where_,
..Default::default()
}){% if is_async %}.await{% endif %}?;
let Some(record) = record else {
return Ok(None);
};
// Build a primary-key filter to delete exactly that one record
{%- set pk_fields = scalar_fields | filter(attribute="is_pk", value=true) %}
let pk_filter = {% for f in pk_fields %}{% if not loop.first %}.and({% endif %}nautilus_core::Expr::column("{{ table_name }}__{{ f.db_name }}").eq(nautilus_core::Expr::param(record.{{ f.name }}.clone())){% if not loop.first %}){% endif %}{% endfor %};
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{{ delete_name }}::new(_c).where_(pk_filter).exec().await?;
nautilus_core::Result::<()>::Ok(())
}))?;
{%- else %}
{{ delete_name }}::new(self.client.clone()).where_(pk_filter).exec().await?;
{%- endif %}
Ok(Some(record))
}
/// Delete all {{ model_name }} records matching `args.where_`.
///
/// If `args.where_` is `None`, all rows are deleted.
/// Executes immediately and returns the deleted records.
pub {% if is_async %}async {% endif %}fn delete_many(
&self,
args: {{ model_name }}DeleteArgs,
) -> nautilus_core::Result<Vec<{{ model_name }}>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
let mut builder = {{ delete_name }}::new({% if is_async %}self.client.clone(){% else %}_c{% endif %});
if let Some(expr) = args.where_ {
builder = builder.where_(expr);
} else {
builder = builder.all();
}
builder.exec().await
{%- if not is_async %}
}))
{%- endif %}
}
/// Count {{ model_name }} records matching `args`.
pub {% if is_async %}async {% endif %}fn count(
&self,
args: {{ model_name }}CountArgs,
) -> nautilus_core::Result<i64> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
if let Some(count) =
crate::runtime::try_count_via_engine(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
args.to_engine_args()?,
)
.await?
{
return Ok(count);
}
Err(nautilus_core::Error::InvalidQuery(
"count queries require the embedded engine path in the generated Rust client"
.to_string(),
))
{%- if not is_async %}
}))
{%- endif %}
}
/// Group {{ model_name }} rows and compute aggregates.
pub {% if is_async %}async {% endif %}fn group_by(
&self,
args: {{ model_name }}GroupByArgs,
) -> nautilus_core::Result<Vec<{{ model_name }}GroupByOutput>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
let engine_args = args.to_engine_args()?;
if let Some(rows) =
crate::runtime::try_group_by_rows_via_engine(
{% if is_async %}&self.client{% else %}&_c{% endif %},
"{{ model_name }}",
engine_args,
)
.await?
{
let mut decoded = Vec::with_capacity(rows.len());
for row in &rows {
decoded.push({{ model_name }}GroupByOutput::from_row(row)?);
}
return Ok(decoded);
}
Err(nautilus_core::Error::InvalidQuery(
"groupBy queries require the embedded engine path in the generated Rust client"
.to_string(),
))
{%- if not is_async %}
}))
{%- endif %}
}
/// Execute a raw SQL string and return the result rows as generic maps.
///
/// The SQL is sent to the database as-is with no parameter binding.
/// Prefer `raw_stmt_query` when user-supplied values are involved to avoid
/// SQL injection.
pub {% if is_async %}async {% endif %}fn raw_query(
&self,
sql: impl Into<String>,
) -> nautilus_core::Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
let sql_obj = nautilus_dialect::Sql { text: sql.into(), params: vec![] };
let stream = {% if is_async %}self.client{% else %}_c{% endif %}.executor().execute(&sql_obj);
futures::pin_mut!(stream);
let mut results: Vec<std::collections::HashMap<String, serde_json::Value>> = Vec::new();
while let Some(result) = stream.next().await {
let row: crate::Row = result
.map_err(|e| nautilus_core::Error::Other(e.to_string()))?
.into();
let map = row
.iter()
.map(|(name, val)| {
let json_val = val.to_json_plain();
(name.to_string(), json_val)
})
.collect();
results.push(map);
}
Ok(results)
{%- if not is_async %}
}))
{%- endif %}
}
/// Execute a raw prepared-statement query with bound parameters.
///
/// Use `$1`, `$2`, … (PostgreSQL) or `?` (MySQL / SQLite) as placeholders.
/// Parameters are bound in the order they appear in the `params` slice.
pub {% if is_async %}async {% endif %}fn raw_stmt_query(
&self,
sql: impl Into<String>,
params: Vec<nautilus_core::Value>,
) -> nautilus_core::Result<Vec<std::collections::HashMap<String, serde_json::Value>>> {
{%- if not is_async %}
let _c = self.client.clone();
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(async move {
{%- endif %}
let sql_obj = nautilus_dialect::Sql { text: sql.into(), params };
let stream = {% if is_async %}self.client{% else %}_c{% endif %}.executor().execute(&sql_obj);
futures::pin_mut!(stream);
let mut results: Vec<std::collections::HashMap<String, serde_json::Value>> = Vec::new();
while let Some(result) = stream.next().await {
let row: crate::Row = result
.map_err(|e| nautilus_core::Error::Other(e.to_string()))?
.into();
let map = row
.iter()
.map(|(name, val)| {
let json_val = val.to_json_plain();
(name.to_string(), json_val)
})
.collect();
results.push(map);
}
Ok(results)
{%- if not is_async %}
}))
{%- endif %}
}
/// Create or update a {{ model_name }} record.
///
/// Looks up an existing record using `args.where_`. If found, updates it
/// with `args.data`; otherwise creates a new record with `args.data`.
/// Executes immediately and returns the created or updated `{{ model_name }}`
/// (or `None` when `args.return_data` is `false`).
pub {% if is_async %}async {% endif %}fn upsert(
&self,
args: {{ model_name }}UpsertArgs,
) -> nautilus_core::Result<Option<{{ model_name }}>> {
let existing = self.find_first(nautilus_core::FindManyArgs {
where_: args.where_.clone(),
..Default::default()
}){% if is_async %}.await{% endif %}?;
if let Some(record) = existing {
let updated = self.update({{ model_name }}UpdateArgs {
where_: args.where_,
data: args.update,
}){% if is_async %}.await{% endif %}?;
if !args.return_data {
return Ok(None);
}
Ok(Some(updated.into_iter().next().unwrap_or(record)))
} else {
let created = self.create(args.create){% if is_async %}.await{% endif %}?;
if !args.return_data {
return Ok(None);
}
Ok(Some(created))
}
}
}