Skip to main content

pgrx_sql_entity_graph/aggregate/
entity.rs

1//LICENSE Portions Copyright 2019-2021 ZomboDB, LLC.
2//LICENSE
3//LICENSE Portions Copyright 2021-2023 Technology Concepts & Design, Inc.
4//LICENSE
5//LICENSE Portions Copyright 2023-2023 PgCentral Foundation, Inc. <contact@pgcentral.org>
6//LICENSE
7//LICENSE All rights reserved.
8//LICENSE
9//LICENSE Use of this source code is governed by the MIT license that can be found in the LICENSE file.
10/*!
11
12`#[pg_aggregate]` related entities for Rust to SQL translation
13
14> Like all of the [`sql_entity_graph`][crate] APIs, this is considered **internal**
15> to the `pgrx` framework and very subject to change between versions. While you may use this, please do it with caution.
16
17
18*/
19use crate::aggregate::options::{FinalizeModify, ParallelOption};
20use crate::fmt;
21use crate::metadata::{SqlArrayMapping, SqlMapping};
22use crate::pgrx_sql::PgrxSql;
23use crate::to_sql::ToSql;
24use crate::to_sql::entity::ToSqlConfigEntity;
25use crate::{SqlGraphEntity, SqlGraphIdentifier, UsedTypeEntity};
26use eyre::{WrapErr, eyre};
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
29pub struct AggregateTypeEntity<'a> {
30    pub used_ty: UsedTypeEntity<'a>,
31    pub name: Option<&'a str>,
32}
33
34#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
35pub struct PgAggregateEntity<'a> {
36    pub full_path: &'a str,
37    pub module_path: &'a str,
38    pub file: &'a str,
39    pub line: u32,
40
41    pub name: &'a str,
42
43    /// If the aggregate is an ordered set aggregate.
44    ///
45    /// See [the PostgreSQL ordered set docs](https://www.postgresql.org/docs/current/xaggr.html#XAGGR-ORDERED-SET-AGGREGATES).
46    pub ordered_set: bool,
47
48    /// The `arg_data_type` list.
49    ///
50    /// Corresponds to `Args` in `pgrx::aggregate::Aggregate`.
51    pub args: Vec<AggregateTypeEntity<'a>>,
52
53    /// The direct argument list, appearing before `ORDER BY` in ordered set aggregates.
54    ///
55    /// Corresponds to `OrderBy` in `pgrx::aggregate::Aggregate`.
56    pub direct_args: Option<Vec<AggregateTypeEntity<'a>>>,
57
58    /// The `STYPE` and `name` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
59    ///
60    /// The implementor of an `pgrx::aggregate::Aggregate`.
61    pub stype: AggregateTypeEntity<'a>,
62
63    /// The `SFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
64    ///
65    /// Corresponds to `state` in `pgrx::aggregate::Aggregate`.
66    pub sfunc: &'a str,
67
68    /// The `FINALFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
69    ///
70    /// Corresponds to `finalize` in `pgrx::aggregate::Aggregate`.
71    pub finalfunc: Option<&'a str>,
72
73    /// The `FINALFUNC_MODIFY` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
74    ///
75    /// Corresponds to `FINALIZE_MODIFY` in `pgrx::aggregate::Aggregate`.
76    pub finalfunc_modify: Option<FinalizeModify>,
77
78    /// The `COMBINEFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
79    ///
80    /// Corresponds to `combine` in `pgrx::aggregate::Aggregate`.
81    pub combinefunc: Option<&'a str>,
82
83    /// The `SERIALFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
84    ///
85    /// Corresponds to `serial` in `pgrx::aggregate::Aggregate`.
86    pub serialfunc: Option<&'a str>,
87
88    /// The `DESERIALFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
89    ///
90    /// Corresponds to `deserial` in `pgrx::aggregate::Aggregate`.
91    pub deserialfunc: Option<&'a str>,
92
93    /// The `INITCOND` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
94    ///
95    /// Corresponds to `INITIAL_CONDITION` in `pgrx::aggregate::Aggregate`.
96    pub initcond: Option<&'a str>,
97
98    /// The `MSFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
99    ///
100    /// Corresponds to `moving_state` in `pgrx::aggregate::Aggregate`.
101    pub msfunc: Option<&'a str>,
102
103    /// The `MINVFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
104    ///
105    /// Corresponds to `moving_state_inverse` in `pgrx::aggregate::Aggregate`.
106    pub minvfunc: Option<&'a str>,
107
108    /// The `MSTYPE` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
109    ///
110    /// Corresponds to `MovingState` in `pgrx::aggregate::Aggregate`.
111    pub mstype: Option<UsedTypeEntity<'a>>,
112
113    // The `MSSPACE` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
114    //
115    // TODO: Currently unused.
116    // pub msspace: &'a str,
117    /// The `MFINALFUNC` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
118    ///
119    /// Corresponds to `moving_state_finalize` in `pgrx::aggregate::Aggregate`.
120    pub mfinalfunc: Option<&'a str>,
121
122    /// The `MFINALFUNC_MODIFY` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
123    ///
124    /// Corresponds to `MOVING_FINALIZE_MODIFY` in `pgrx::aggregate::Aggregate`.
125    pub mfinalfunc_modify: Option<FinalizeModify>,
126
127    /// The `MINITCOND` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
128    ///
129    /// Corresponds to `MOVING_INITIAL_CONDITION` in `pgrx::aggregate::Aggregate`.
130    pub minitcond: Option<&'a str>,
131
132    /// The `SORTOP` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
133    ///
134    /// Corresponds to `SORT_OPERATOR` in `pgrx::aggregate::Aggregate`.
135    pub sortop: Option<&'a str>,
136
137    /// The `PARALLEL` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
138    ///
139    /// Corresponds to `PARALLEL` in `pgrx::aggregate::Aggregate`.
140    pub parallel: Option<ParallelOption>,
141
142    /// The `HYPOTHETICAL` parameter for [`CREATE AGGREGATE`](https://www.postgresql.org/docs/current/sql-createaggregate.html)
143    ///
144    /// Corresponds to `hypothetical` in `pgrx::aggregate::Aggregate`.
145    pub hypothetical: bool,
146    pub to_sql_config: ToSqlConfigEntity<'a>,
147}
148
149impl<'a> From<PgAggregateEntity<'a>> for SqlGraphEntity<'a> {
150    fn from(val: PgAggregateEntity<'a>) -> Self {
151        SqlGraphEntity::Aggregate(val)
152    }
153}
154
155impl SqlGraphIdentifier for PgAggregateEntity<'_> {
156    fn dot_identifier(&self) -> String {
157        format!("aggregate {}", self.full_path)
158    }
159    fn rust_identifier(&self) -> String {
160        self.full_path.to_string()
161    }
162    fn file(&self) -> Option<&str> {
163        Some(self.file)
164    }
165    fn line(&self) -> Option<u32> {
166        Some(self.line)
167    }
168}
169
170fn aggregate_sql_type(mapping: &SqlMapping, composite_type: Option<&str>) -> eyre::Result<String> {
171    match mapping {
172        SqlMapping::As(sql) => Ok(sql.clone()),
173        SqlMapping::Composite => composite_type
174            .map(ToString::to_string)
175            .ok_or_else(|| eyre!("Composite mapping requires composite_type")),
176        SqlMapping::Array(SqlArrayMapping::As(sql)) => Ok(fmt::with_array_brackets(sql.clone(), 1)),
177        SqlMapping::Array(SqlArrayMapping::Composite) => composite_type
178            .map(ToString::to_string)
179            .map(|sql| fmt::with_array_brackets(sql, 1))
180            .ok_or_else(|| eyre!("Composite mapping requires composite_type")),
181        SqlMapping::Skip => {
182            Err(eyre!("Cannot use skipped SQL translatable type as aggregate const type"))
183        }
184    }
185}
186
187impl ToSql for PgAggregateEntity<'_> {
188    fn to_sql(&self, context: &PgrxSql) -> eyre::Result<String> {
189        let self_index = context.aggregates[self];
190        let mut optional_attributes = Vec::new();
191        let schema = context.schema_prefix_for(&self_index);
192
193        if let Some(value) = self.finalfunc {
194            optional_attributes.push((
195                format!("\tFINALFUNC = {schema}\"{value}\""),
196                format!("/* {}::final */", self.full_path),
197            ));
198        }
199        if let Some(value) = self.finalfunc_modify {
200            optional_attributes.push((
201                format!("\tFINALFUNC_MODIFY = {}", value.to_sql(context)?),
202                format!("/* {}::FINALIZE_MODIFY */", self.full_path),
203            ));
204        }
205        if let Some(value) = self.combinefunc {
206            optional_attributes.push((
207                format!("\tCOMBINEFUNC = {schema}\"{value}\""),
208                format!("/* {}::combine */", self.full_path),
209            ));
210        }
211        if let Some(value) = self.serialfunc {
212            optional_attributes.push((
213                format!("\tSERIALFUNC = {schema}\"{value}\""),
214                format!("/* {}::serial */", self.full_path),
215            ));
216        }
217        if let Some(value) = self.deserialfunc {
218            optional_attributes.push((
219                format!("\tDESERIALFUNC ={schema} \"{value}\""),
220                format!("/* {}::deserial */", self.full_path),
221            ));
222        }
223        if let Some(value) = self.initcond {
224            optional_attributes.push((
225                format!("\tINITCOND = '{value}'"),
226                format!("/* {}::INITIAL_CONDITION */", self.full_path),
227            ));
228        }
229        if let Some(value) = self.msfunc {
230            optional_attributes.push((
231                format!("\tMSFUNC = {schema}\"{value}\""),
232                format!("/* {}::moving_state */", self.full_path),
233            ));
234        }
235        if let Some(value) = self.minvfunc {
236            optional_attributes.push((
237                format!("\tMINVFUNC = {schema}\"{value}\""),
238                format!("/* {}::moving_state_inverse */", self.full_path),
239            ));
240        }
241        if let Some(value) = self.mfinalfunc {
242            optional_attributes.push((
243                format!("\tMFINALFUNC = {schema}\"{value}\""),
244                format!("/* {}::moving_state_finalize */", self.full_path),
245            ));
246        }
247        if let Some(value) = self.mfinalfunc_modify {
248            optional_attributes.push((
249                format!("\tMFINALFUNC_MODIFY = {}", value.to_sql(context)?),
250                format!("/* {}::MOVING_FINALIZE_MODIFY */", self.full_path),
251            ));
252        }
253        if let Some(value) = self.minitcond {
254            optional_attributes.push((
255                format!("\tMINITCOND = '{value}'"),
256                format!("/* {}::MOVING_INITIAL_CONDITION */", self.full_path),
257            ));
258        }
259        if let Some(value) = self.sortop {
260            optional_attributes.push((
261                format!("\tSORTOP = \"{value}\""),
262                format!("/* {}::SORT_OPERATOR */", self.full_path),
263            ));
264        }
265        if let Some(value) = self.parallel {
266            optional_attributes.push((
267                format!("\tPARALLEL = {}", value.to_sql(context)?),
268                format!("/* {}::PARALLEL */", self.full_path),
269            ));
270        }
271        if self.hypothetical {
272            optional_attributes.push((
273                String::from("\tHYPOTHETICAL"),
274                format!("/* {}::hypothetical */", self.full_path),
275            ))
276        }
277
278        let map_ty = |used_ty: &UsedTypeEntity| -> eyre::Result<String> {
279            match used_ty.metadata.argument_sql {
280                Ok(ref mapping) => aggregate_sql_type(mapping, used_ty.composite_type),
281                Err(err) => Err(err).wrap_err("While mapping argument"),
282            }
283        };
284
285        let sql_type_for_slot = |slot: &str,
286                                 used_ty: &UsedTypeEntity|
287         -> eyre::Result<(String, String)> {
288            let sql = map_ty(used_ty).wrap_err_with(|| format!("Mapping {slot}"))?;
289            let schema_prefix = context.schema_prefix_for_used_type(&self_index, slot, used_ty)?;
290            Ok((schema_prefix, sql))
291        };
292        let (stype_schema, stype_sql) = sql_type_for_slot("STYPE", &self.stype.used_ty)?;
293
294        if let Some(value) = &self.mstype {
295            let (mstype_schema, mstype_sql) = sql_type_for_slot("MSTYPE", value)?;
296            optional_attributes.push((
297                format!("\tMSTYPE = {mstype_schema}{mstype_sql}"),
298                format!("/* {}::MovingState = {} */", self.full_path, value.full_path),
299            ));
300        }
301
302        let mut optional_attributes_string = String::new();
303        for (index, (optional_attribute, comment)) in optional_attributes.iter().enumerate() {
304            let optional_attribute_string = format!(
305                "{optional_attribute}{maybe_comma} {comment}{maybe_newline}",
306                optional_attribute = optional_attribute,
307                maybe_comma = if index == optional_attributes.len() - 1 { "" } else { "," },
308                comment = comment,
309                maybe_newline = if index == optional_attributes.len() - 1 { "" } else { "\n" }
310            );
311            optional_attributes_string += &optional_attribute_string;
312        }
313
314        let args = {
315            let mut args = Vec::new();
316            for (idx, arg) in self.args.iter().enumerate() {
317                let needs_comma = idx < (self.args.len() - 1);
318                let schema_prefix = context.schema_prefix_for_used_type(
319                    &self_index,
320                    arg.name.unwrap_or("aggregate argument"),
321                    &arg.used_ty,
322                )?;
323                let buf = format!(
324                    "\
325                       \t{name}{variadic}{schema_prefix}{sql_type}{maybe_comma}/* {full_path} */\
326                   ",
327                    schema_prefix = schema_prefix,
328                    // The SQL spelling comes from the embedded schema metadata.
329                    sql_type = match arg.used_ty.metadata.argument_sql {
330                        Ok(ref mapping) => aggregate_sql_type(mapping, arg.used_ty.composite_type)?,
331                        Err(err) => return Err(err).wrap_err("While mapping argument"),
332                    },
333                    variadic = if arg.used_ty.variadic { "VARIADIC " } else { "" },
334                    maybe_comma = if needs_comma { ", " } else { " " },
335                    full_path = arg.used_ty.full_path,
336                    name = if let Some(name) = arg.name {
337                        format!(r#""{name}" "#)
338                    } else {
339                        "".to_string()
340                    },
341                );
342                args.push(buf);
343            }
344            "\n".to_string() + &args.join("\n") + "\n"
345        };
346        let direct_args = if let Some(direct_args) = &self.direct_args {
347            let mut args = Vec::new();
348            for (idx, arg) in direct_args.iter().enumerate() {
349                let schema_prefix = context.schema_prefix_for_used_type(
350                    &self_index,
351                    arg.name.unwrap_or("aggregate direct argument"),
352                    &arg.used_ty,
353                )?;
354                let needs_comma = idx < (direct_args.len() - 1);
355                let buf = format!(
356                    "\
357                    \t{maybe_name}{schema_prefix}{sql_type}{maybe_comma}/* {full_path} */\
358                   ",
359                    schema_prefix = schema_prefix,
360                    // The SQL spelling comes from the embedded schema metadata.
361                    sql_type = map_ty(&arg.used_ty).wrap_err("Mapping direct arg type")?,
362                    maybe_name = if let Some(name) = arg.name {
363                        "\"".to_string() + name + "\" "
364                    } else {
365                        "".to_string()
366                    },
367                    maybe_comma = if needs_comma { ", " } else { " " },
368                    full_path = arg.used_ty.full_path,
369                );
370                args.push(buf);
371            }
372            "\n".to_string() + &args.join("\n") + "\n"
373        } else {
374            String::default()
375        };
376
377        let PgAggregateEntity { name, full_path, file, line, sfunc, .. } = self;
378
379        let sql = format!(
380            "\n\
381                -- {file}:{line}\n\
382                -- {full_path}\n\
383                CREATE AGGREGATE {schema}{name} ({direct_args}{maybe_order_by}{args})\n\
384                (\n\
385                    \tSFUNC = {schema}\"{sfunc}\", /* {full_path}::state */\n\
386                    \tSTYPE = {stype_schema}{stype_sql}{maybe_comma_after_stype} /* {stype_full_path} */\
387                    {optional_attributes}\
388                );\
389            ",
390            stype_full_path = self.stype.used_ty.full_path,
391            maybe_comma_after_stype = if optional_attributes.is_empty() { "" } else { "," },
392            maybe_order_by = if self.ordered_set { "\tORDER BY" } else { "" },
393            optional_attributes = String::from("\n")
394                + &optional_attributes_string
395                + if optional_attributes.is_empty() { "" } else { "\n" },
396        );
397        Ok(sql)
398    }
399}