pgrx_sql_entity_graph/aggregate/
entity.rs1use crate::aggregate::options::{FinalizeModify, ParallelOption};
20use crate::fmt;
21use crate::metadata::{SqlArrayMapping, SqlMapping};
22use crate::pgrx_sql::PgrxSql;
23use crate::to_sql::ToSql;
24use crate::to_sql::entity::ToSqlConfigEntity;
25use crate::{SqlGraphEntity, SqlGraphIdentifier, UsedTypeEntity};
26use eyre::{WrapErr, eyre};
27
28#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
29pub struct AggregateTypeEntity<'a> {
30 pub used_ty: UsedTypeEntity<'a>,
31 pub name: Option<&'a str>,
32}
33
34#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
35pub struct PgAggregateEntity<'a> {
36 pub full_path: &'a str,
37 pub module_path: &'a str,
38 pub file: &'a str,
39 pub line: u32,
40
41 pub name: &'a str,
42
43 pub ordered_set: bool,
47
48 pub args: Vec<AggregateTypeEntity<'a>>,
52
53 pub direct_args: Option<Vec<AggregateTypeEntity<'a>>>,
57
58 pub stype: AggregateTypeEntity<'a>,
62
63 pub sfunc: &'a str,
67
68 pub finalfunc: Option<&'a str>,
72
73 pub finalfunc_modify: Option<FinalizeModify>,
77
78 pub combinefunc: Option<&'a str>,
82
83 pub serialfunc: Option<&'a str>,
87
88 pub deserialfunc: Option<&'a str>,
92
93 pub initcond: Option<&'a str>,
97
98 pub msfunc: Option<&'a str>,
102
103 pub minvfunc: Option<&'a str>,
107
108 pub mstype: Option<UsedTypeEntity<'a>>,
112
113 pub mfinalfunc: Option<&'a str>,
121
122 pub mfinalfunc_modify: Option<FinalizeModify>,
126
127 pub minitcond: Option<&'a str>,
131
132 pub sortop: Option<&'a str>,
136
137 pub parallel: Option<ParallelOption>,
141
142 pub hypothetical: bool,
146 pub to_sql_config: ToSqlConfigEntity<'a>,
147}
148
149impl<'a> From<PgAggregateEntity<'a>> for SqlGraphEntity<'a> {
150 fn from(val: PgAggregateEntity<'a>) -> Self {
151 SqlGraphEntity::Aggregate(val)
152 }
153}
154
155impl SqlGraphIdentifier for PgAggregateEntity<'_> {
156 fn dot_identifier(&self) -> String {
157 format!("aggregate {}", self.full_path)
158 }
159 fn rust_identifier(&self) -> String {
160 self.full_path.to_string()
161 }
162 fn file(&self) -> Option<&str> {
163 Some(self.file)
164 }
165 fn line(&self) -> Option<u32> {
166 Some(self.line)
167 }
168}
169
170fn aggregate_sql_type(mapping: &SqlMapping, composite_type: Option<&str>) -> eyre::Result<String> {
171 match mapping {
172 SqlMapping::As(sql) => Ok(sql.clone()),
173 SqlMapping::Composite => composite_type
174 .map(ToString::to_string)
175 .ok_or_else(|| eyre!("Composite mapping requires composite_type")),
176 SqlMapping::Array(SqlArrayMapping::As(sql)) => Ok(fmt::with_array_brackets(sql.clone(), 1)),
177 SqlMapping::Array(SqlArrayMapping::Composite) => composite_type
178 .map(ToString::to_string)
179 .map(|sql| fmt::with_array_brackets(sql, 1))
180 .ok_or_else(|| eyre!("Composite mapping requires composite_type")),
181 SqlMapping::Skip => {
182 Err(eyre!("Cannot use skipped SQL translatable type as aggregate const type"))
183 }
184 }
185}
186
187impl ToSql for PgAggregateEntity<'_> {
188 fn to_sql(&self, context: &PgrxSql) -> eyre::Result<String> {
189 let self_index = context.aggregates[self];
190 let mut optional_attributes = Vec::new();
191 let schema = context.schema_prefix_for(&self_index);
192
193 if let Some(value) = self.finalfunc {
194 optional_attributes.push((
195 format!("\tFINALFUNC = {schema}\"{value}\""),
196 format!("/* {}::final */", self.full_path),
197 ));
198 }
199 if let Some(value) = self.finalfunc_modify {
200 optional_attributes.push((
201 format!("\tFINALFUNC_MODIFY = {}", value.to_sql(context)?),
202 format!("/* {}::FINALIZE_MODIFY */", self.full_path),
203 ));
204 }
205 if let Some(value) = self.combinefunc {
206 optional_attributes.push((
207 format!("\tCOMBINEFUNC = {schema}\"{value}\""),
208 format!("/* {}::combine */", self.full_path),
209 ));
210 }
211 if let Some(value) = self.serialfunc {
212 optional_attributes.push((
213 format!("\tSERIALFUNC = {schema}\"{value}\""),
214 format!("/* {}::serial */", self.full_path),
215 ));
216 }
217 if let Some(value) = self.deserialfunc {
218 optional_attributes.push((
219 format!("\tDESERIALFUNC ={schema} \"{value}\""),
220 format!("/* {}::deserial */", self.full_path),
221 ));
222 }
223 if let Some(value) = self.initcond {
224 optional_attributes.push((
225 format!("\tINITCOND = '{value}'"),
226 format!("/* {}::INITIAL_CONDITION */", self.full_path),
227 ));
228 }
229 if let Some(value) = self.msfunc {
230 optional_attributes.push((
231 format!("\tMSFUNC = {schema}\"{value}\""),
232 format!("/* {}::moving_state */", self.full_path),
233 ));
234 }
235 if let Some(value) = self.minvfunc {
236 optional_attributes.push((
237 format!("\tMINVFUNC = {schema}\"{value}\""),
238 format!("/* {}::moving_state_inverse */", self.full_path),
239 ));
240 }
241 if let Some(value) = self.mfinalfunc {
242 optional_attributes.push((
243 format!("\tMFINALFUNC = {schema}\"{value}\""),
244 format!("/* {}::moving_state_finalize */", self.full_path),
245 ));
246 }
247 if let Some(value) = self.mfinalfunc_modify {
248 optional_attributes.push((
249 format!("\tMFINALFUNC_MODIFY = {}", value.to_sql(context)?),
250 format!("/* {}::MOVING_FINALIZE_MODIFY */", self.full_path),
251 ));
252 }
253 if let Some(value) = self.minitcond {
254 optional_attributes.push((
255 format!("\tMINITCOND = '{value}'"),
256 format!("/* {}::MOVING_INITIAL_CONDITION */", self.full_path),
257 ));
258 }
259 if let Some(value) = self.sortop {
260 optional_attributes.push((
261 format!("\tSORTOP = \"{value}\""),
262 format!("/* {}::SORT_OPERATOR */", self.full_path),
263 ));
264 }
265 if let Some(value) = self.parallel {
266 optional_attributes.push((
267 format!("\tPARALLEL = {}", value.to_sql(context)?),
268 format!("/* {}::PARALLEL */", self.full_path),
269 ));
270 }
271 if self.hypothetical {
272 optional_attributes.push((
273 String::from("\tHYPOTHETICAL"),
274 format!("/* {}::hypothetical */", self.full_path),
275 ))
276 }
277
278 let map_ty = |used_ty: &UsedTypeEntity| -> eyre::Result<String> {
279 match used_ty.metadata.argument_sql {
280 Ok(ref mapping) => aggregate_sql_type(mapping, used_ty.composite_type),
281 Err(err) => Err(err).wrap_err("While mapping argument"),
282 }
283 };
284
285 let sql_type_for_slot = |slot: &str,
286 used_ty: &UsedTypeEntity|
287 -> eyre::Result<(String, String)> {
288 let sql = map_ty(used_ty).wrap_err_with(|| format!("Mapping {slot}"))?;
289 let schema_prefix = context.schema_prefix_for_used_type(&self_index, slot, used_ty)?;
290 Ok((schema_prefix, sql))
291 };
292 let (stype_schema, stype_sql) = sql_type_for_slot("STYPE", &self.stype.used_ty)?;
293
294 if let Some(value) = &self.mstype {
295 let (mstype_schema, mstype_sql) = sql_type_for_slot("MSTYPE", value)?;
296 optional_attributes.push((
297 format!("\tMSTYPE = {mstype_schema}{mstype_sql}"),
298 format!("/* {}::MovingState = {} */", self.full_path, value.full_path),
299 ));
300 }
301
302 let mut optional_attributes_string = String::new();
303 for (index, (optional_attribute, comment)) in optional_attributes.iter().enumerate() {
304 let optional_attribute_string = format!(
305 "{optional_attribute}{maybe_comma} {comment}{maybe_newline}",
306 optional_attribute = optional_attribute,
307 maybe_comma = if index == optional_attributes.len() - 1 { "" } else { "," },
308 comment = comment,
309 maybe_newline = if index == optional_attributes.len() - 1 { "" } else { "\n" }
310 );
311 optional_attributes_string += &optional_attribute_string;
312 }
313
314 let args = {
315 let mut args = Vec::new();
316 for (idx, arg) in self.args.iter().enumerate() {
317 let needs_comma = idx < (self.args.len() - 1);
318 let schema_prefix = context.schema_prefix_for_used_type(
319 &self_index,
320 arg.name.unwrap_or("aggregate argument"),
321 &arg.used_ty,
322 )?;
323 let buf = format!(
324 "\
325 \t{name}{variadic}{schema_prefix}{sql_type}{maybe_comma}/* {full_path} */\
326 ",
327 schema_prefix = schema_prefix,
328 sql_type = match arg.used_ty.metadata.argument_sql {
330 Ok(ref mapping) => aggregate_sql_type(mapping, arg.used_ty.composite_type)?,
331 Err(err) => return Err(err).wrap_err("While mapping argument"),
332 },
333 variadic = if arg.used_ty.variadic { "VARIADIC " } else { "" },
334 maybe_comma = if needs_comma { ", " } else { " " },
335 full_path = arg.used_ty.full_path,
336 name = if let Some(name) = arg.name {
337 format!(r#""{name}" "#)
338 } else {
339 "".to_string()
340 },
341 );
342 args.push(buf);
343 }
344 "\n".to_string() + &args.join("\n") + "\n"
345 };
346 let direct_args = if let Some(direct_args) = &self.direct_args {
347 let mut args = Vec::new();
348 for (idx, arg) in direct_args.iter().enumerate() {
349 let schema_prefix = context.schema_prefix_for_used_type(
350 &self_index,
351 arg.name.unwrap_or("aggregate direct argument"),
352 &arg.used_ty,
353 )?;
354 let needs_comma = idx < (direct_args.len() - 1);
355 let buf = format!(
356 "\
357 \t{maybe_name}{schema_prefix}{sql_type}{maybe_comma}/* {full_path} */\
358 ",
359 schema_prefix = schema_prefix,
360 sql_type = map_ty(&arg.used_ty).wrap_err("Mapping direct arg type")?,
362 maybe_name = if let Some(name) = arg.name {
363 "\"".to_string() + name + "\" "
364 } else {
365 "".to_string()
366 },
367 maybe_comma = if needs_comma { ", " } else { " " },
368 full_path = arg.used_ty.full_path,
369 );
370 args.push(buf);
371 }
372 "\n".to_string() + &args.join("\n") + "\n"
373 } else {
374 String::default()
375 };
376
377 let PgAggregateEntity { name, full_path, file, line, sfunc, .. } = self;
378
379 let sql = format!(
380 "\n\
381 -- {file}:{line}\n\
382 -- {full_path}\n\
383 CREATE AGGREGATE {schema}{name} ({direct_args}{maybe_order_by}{args})\n\
384 (\n\
385 \tSFUNC = {schema}\"{sfunc}\", /* {full_path}::state */\n\
386 \tSTYPE = {stype_schema}{stype_sql}{maybe_comma_after_stype} /* {stype_full_path} */\
387 {optional_attributes}\
388 );\
389 ",
390 stype_full_path = self.stype.used_ty.full_path,
391 maybe_comma_after_stype = if optional_attributes.is_empty() { "" } else { "," },
392 maybe_order_by = if self.ordered_set { "\tORDER BY" } else { "" },
393 optional_attributes = String::from("\n")
394 + &optional_attributes_string
395 + if optional_attributes.is_empty() { "" } else { "\n" },
396 );
397 Ok(sql)
398 }
399}