1#![warn(missing_docs)]
4
5use crate::{
10 DataSource, Expression, MyError, QString, config::config, ds::sql::MIN_DATE_SQL, expr::E,
11 op::Op,
12};
13use sqlx::{
14 AssertSqlSafe, FromRow, PgPool,
15 postgres::{PgConnectOptions, PgPoolOptions},
16};
17use tracing::debug;
18
19const FIND_TABLE: &str = "SELECT * FROM geometry_columns WHERE f_table_name = $1";
20const CQL2_CI: &str = "cql2_ci";
22const CQL2_AI: &str = "cql2_ai";
24const CQL2_CAI: &str = "cql2_ci_ai";
27const PG_UNICODE: &str = "pg_unicode_fast";
30
31#[derive(Debug, FromRow)]
33struct Pragma(String);
34
35#[allow(dead_code)]
37#[derive(Debug, FromRow)]
38pub(crate) struct TGeometryColumn {
39 f_table_name: String,
40 f_geometry_column: String,
41 coord_dimension: i32,
42 srid: i32,
43 #[sqlx(rename = "type")]
44 type_: String,
45}
46
47#[derive(Debug)]
50#[allow(dead_code)]
51pub struct PGDataSource {
52 db_name: String,
53 table: String,
54 pool: PgPool,
55 srid: u32,
56}
57
58impl DataSource for PGDataSource {
59 fn srid(&self) -> Option<u32> {
60 Some(self.srid)
61 }
62}
63
64impl PGDataSource {
65 pub async fn from(db_name: &str, table: &str) -> Result<Self, MyError> {
67 let pg_server_url = config().pg_url();
69 let url = format!("{pg_server_url}/{db_name}");
70 let pool_opts = url
72 .parse::<PgConnectOptions>()?
73 .application_name(config().pg_appname());
74 let pool = PgPoolOptions::new()
76 .min_connections(config().pg_min_connections())
77 .max_connections(config().pg_max_connections())
78 .acquire_timeout(config().pg_acquire_timeout())
79 .idle_timeout(config().pg_idle_timeout())
80 .max_lifetime(config().pg_max_lifetime())
81 .connect_with(pool_opts)
82 .await?;
83
84 let pargma = sqlx::query_as::<_, Pragma>("SELECT PostGIS_Version();")
87 .fetch_one(&pool)
88 .await?;
89 let v = pargma.0;
90 debug!("PostGIS Version = {v}");
91
92 let row = sqlx::query_as::<_, TGeometryColumn>(FIND_TABLE)
94 .bind(table)
95 .fetch_one(&pool)
96 .await?;
97 debug!("geometry_column = {row:?}");
98 let srid = u32::try_from(row.srid)?;
100
101 let sql = "SET TIME ZONE 'UTC';";
103 let safe_sql = AssertSqlSafe(sql);
104 sqlx::query(safe_sql).execute(&pool).await?;
105
106 let sql = format!(
109 r#"CREATE COLLATION IF NOT EXISTS "{CQL2_CI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level2');"#
110 );
111 let safe_sql = AssertSqlSafe(sql);
112 sqlx::query(safe_sql).execute(&pool).await?;
113 let sql = format!(
115 r#"CREATE COLLATION IF NOT EXISTS "{CQL2_AI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level1-kc-true');"#
116 );
117 let safe_sql = AssertSqlSafe(sql);
118 sqlx::query(safe_sql).execute(&pool).await?;
119 let sql = format!(
121 r#"CREATE COLLATION IF NOT EXISTS "{CQL2_CAI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level1');"#
122 );
123 let safe_sql = AssertSqlSafe(sql);
124 sqlx::query(safe_sql).execute(&pool).await?;
125
126 Ok(Self {
127 db_name: db_name.to_owned(),
128 table: table.to_owned(),
129 pool,
130 srid,
131 })
132 }
133
134 pub fn pool(&self) -> &PgPool {
136 &self.pool
137 }
138
139 pub fn table(&self) -> &str {
141 &self.table
142 }
143
144 pub fn to_sql(&self, exp: &Expression) -> Result<String, MyError> {
147 let mut e = exp.to_inner()?;
148 let reduced = E::reduce(&mut e)?;
149 self.to_sql_impl(reduced)
150 }
151
152 fn to_sql_impl(&self, exp: E) -> Result<String, MyError> {
153 match exp {
154 E::Null => Ok("NULL".to_owned()),
155 E::Unbounded => Ok(MIN_DATE_SQL.to_owned()),
156 E::Bool(true) => Ok("TRUE".to_owned()),
157 E::Bool(false) => Ok("FALSE".to_owned()),
158 E::Num(x) => Ok(x.to_string()),
159 E::Str(x) => qstr_to_sql(x),
160 E::Date(x) => Ok(format!("'{}'", x.date())),
161 E::Timestamp(x) => Ok(format!("'{}'", x.datetime())),
162 E::Spatial(x) => Ok(x.to_sql()?),
163 E::Id(x) => Ok(double_quoted(x)),
164 E::Monadic(op, x) if op.nullable() => {
166 let is_literal = x.is_literal_or_id();
167 let lhs = self.to_sql_impl(*x)?;
168 let z_op = op.to_sql();
169 if is_literal {
170 Ok(format!("{lhs} {z_op}"))
171 } else {
172 Ok(format!("({lhs}) {z_op}"))
173 }
174 }
175 E::Monadic(op, x) => match op {
176 Op::Neg | Op::Minus => {
177 let is_literal = x.is_literal_or_id();
178 let rhs = self.to_sql_impl(*x)?;
179 let z_op = op.to_sql();
180 if is_literal {
181 Ok(format!("{z_op} {rhs}"))
182 } else {
183 Ok(format!("{z_op} ({rhs})"))
184 }
185 }
186 Op::CaseI => match *x {
187 E::Monadic(Op::AccentI, y) => {
188 let rhs = self.to_sql_impl(*y)?;
189 Ok(format!("{rhs} COLLATE {CQL2_CAI}"))
190 }
191 _ => {
192 let rhs = self.to_sql_impl(*x)?;
193 Ok(format!("{rhs} COLLATE {CQL2_CI}"))
194 }
195 },
196 Op::AccentI => match *x {
197 E::Monadic(Op::CaseI, y) => {
198 let rhs = self.to_sql_impl(*y)?;
199 Ok(format!("{rhs} COLLATE {CQL2_CAI}"))
200 }
201 _ => {
202 let rhs = self.to_sql_impl(*x)?;
203 Ok(format!("{rhs} COLLATE {CQL2_AI}"))
204 }
205 },
206 x => unreachable!("Unexpected ({x}) monadic operator"),
207 },
208 E::Dyadic(op, a, b)
209 if matches!(op, Op::IsBetween) || matches!(op, Op::IsNotBetween) =>
210 {
211 match *b {
213 E::Array(rhs) => {
214 let z_op = op.to_sql();
215 let lhs = self.to_sql_impl(*a)?;
216 let lo = self.to_sql_impl(rhs[0].to_owned())?;
217 let hi = self.to_sql_impl(rhs[1].to_owned())?;
218 Ok(format!("{lhs} {z_op} {lo} AND {hi}"))
219 }
220 _ => unreachable!("Expetced [NOT] BETWEEN's RHS expression to be an array"),
221 }
222 }
223 E::Dyadic(op, a, b) if op.spatial() => match op {
224 Op::SWithin | Op::SOverlaps | Op::STouches => self.reduce_precision(op, *a, *b),
225 _ => {
226 let lhs = self.to_sql_impl(*a)?;
227 let rhs = self.to_sql_impl(*b)?;
228 let z_op = op.to_sql();
229 Ok(format!("{z_op}({lhs}, {rhs})"))
230 }
231 },
232 E::Dyadic(op, a, b) if op.temporal() => match op {
233 Op::TAfter => self.t_after_sql(*a, *b),
234 Op::TBefore => self.t_before_sql(*a, *b),
235 Op::TDisjoint => self.t_disjoint_sql(*a, *b),
236 Op::TEquals => self.t_equals_sql(*a, *b),
237 Op::TIntersects => self.t_intersects_sql(*a, *b),
238
239 Op::TContains => self.t_contains_sql(*a, *b),
240 Op::TDuring => self.t_during_sql(*a, *b),
241 Op::TFinishedBy => self.t_finished_by_sql(*a, *b),
242 Op::TFinishes => self.t_finishes_sql(*a, *b),
243 Op::TMeets => self.t_meets_sql(*a, *b),
244 Op::TMetBy => self.t_met_by_sql(*a, *b),
245 Op::TOverlappedBy => self.t_overlapped_by_sql(*a, *b),
246 Op::TOverlaps => self.t_overlaps_sql(*a, *b),
247 Op::TStartedBy => self.t_started_by_sql(*a, *b),
248 Op::TStarts => self.t_starts_sql(*a, *b),
249 x => unreachable!("Unexpected ({x:?}) operator"),
250 },
251 E::Dyadic(op, a, b) if op.array() => {
252 let z_op = op.to_sql();
253 let lhs = self.to_sql_impl(*a)?;
254 let rhs = self.to_sql_impl(*b)?;
255 Ok(format!("{lhs} {z_op} {rhs}"))
256 }
257 E::Dyadic(op, a, b) if matches!(op, Op::IsLike) || matches!(op, Op::IsNotLike) => {
258 let a_is_literal = a.is_literal_or_id();
259 let lhs = self.to_sql_impl(*a)?;
260 let rhs = self.to_sql_impl(*b)?;
261 let z_op = op.to_sql();
262 match a_is_literal {
263 true => Ok(format!("{lhs} {z_op} ({rhs})")),
264 false => Ok(format!("({lhs}) {z_op} ({rhs})")),
265 }
266 }
267 E::Dyadic(op, a, b) => {
268 let a_is_literal = a.is_literal_or_id();
269 let b_is_literal = b.is_literal_or_id();
270 let lhs = self.to_sql_impl(*a)?;
271 let rhs = self.to_sql_impl(*b)?;
272 let z_op = op.to_sql();
273 match (a_is_literal, b_is_literal) {
274 (true, true) => Ok(format!("{lhs} {z_op} {rhs}")),
275 (true, false) => Ok(format!("{lhs} {z_op} ({rhs})")),
276 (false, true) => Ok(format!("({lhs}) {z_op} {rhs}")),
277 (false, false) => Ok(format!("({lhs}) {z_op} ({rhs})")),
278 }
279 }
280 E::Function(x) => {
281 let params: Result<Vec<String>, MyError> =
282 x.params.into_iter().map(|x| self.to_sql_impl(x)).collect();
283 let params_ = params?;
284 Ok(format!("{}({})", x.name, params_.join(", ")))
285 }
286 E::Array(x) => {
287 let items: Result<Vec<String>, MyError> =
288 x.into_iter().map(|x| self.to_sql_impl(x)).collect();
289 let items_ = items?;
290 Ok(format!("({})", items_.join(", ")))
291 }
292 x => unreachable!("{x:?} cannot be transformed to SQL"),
293 }
294 }
295
296 fn reduce_precision(&self, op: Op, a: E, b: E) -> Result<String, MyError> {
297 let a_is_id = a.is_id();
298 let b_is_id = b.is_id();
299 let (lhs, rhs) = match (a_is_id, b_is_id) {
300 (true, false) => {
301 let lhs = self.reduce_precision_sql(a)?;
302 let rhs = self.to_sql_impl(b)?;
303 (lhs, rhs)
304 }
305 (false, true) => {
306 let lhs = self.to_sql_impl(a)?;
307 let rhs = self.reduce_precision_sql(b)?;
308 (lhs, rhs)
309 }
310 _ => {
311 let lhs = self.to_sql_impl(a)?;
312 let rhs = self.to_sql_impl(b)?;
313 (lhs, rhs)
314 }
315 };
316 let z_op = op.to_sql();
317 Ok(format!("{z_op}({lhs}, {rhs})"))
318 }
319
320 fn reduce_precision_sql(&self, a: E) -> Result<String, MyError> {
321 let it = format!(
322 "ST_ReducePrecision({}, 1E-{})",
323 self.to_sql_impl(a)?,
324 config().default_precision()
325 );
326 Ok(it)
327 }
328
329 fn t_after_sql(&self, a: E, b: E) -> Result<String, MyError> {
331 let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
332 match (a_is_interval, b_is_interval) {
333 (false, false) => Ok(format!(
334 "{} > {}",
335 self.to_sql_impl(e0)?,
336 self.to_sql_impl(e2)?
337 )),
338 (false, true) => {
340 let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
341 let sql = crate::check_ids!(e2, base);
342 Ok(sql)
343 }
344 (true, false) => {
345 let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e2)?);
346 let sql = crate::check_ids!(e1, base);
347 Ok(sql)
348 }
349 (true, true) => {
350 let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
351 let sql = crate::check_ids!(e1, e2, base);
352 Ok(sql)
353 }
354 }
355 }
356
357 fn t_before_sql(&self, a: E, b: E) -> Result<String, MyError> {
358 let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
359 match (a_is_interval, b_is_interval) {
360 (false, false) => Ok(format!(
361 "{} < {}",
362 self.to_sql_impl(e0)?,
363 self.to_sql_impl(e2)?
364 )),
365 (false, true) => {
366 let base = format!("{} < {}", self.to_sql_impl(e0)?, self.to_sql_impl(e2)?);
367 let sql = crate::check_ids!(e3, base);
368 Ok(sql)
369 }
370 (true, false) => {
371 let base = format!("{} < {}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
372 let sql = crate::check_ids!(e0, base);
373 Ok(sql)
374 }
375 (true, true) => {
376 let base = format!("{} < {}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
377 let sql = crate::check_ids!(e0, e3, base);
378 Ok(sql)
379 }
380 }
381 }
382
383 fn t_disjoint_sql(&self, a: E, b: E) -> Result<String, MyError> {
384 let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
385 match (a_is_interval, b_is_interval) {
386 (false, false) => Ok(format!(
387 "{} != {}",
388 self.to_sql_impl(e0)?,
389 self.to_sql_impl(e2)?
390 )),
391 (false, true) => {
392 let e2_ = e2.clone();
393 let e3_ = e3.clone();
394 let s0 = self.to_sql_impl(e0)?;
395 let s2 = self.to_sql_impl(e2)?;
396 let s3 = self.to_sql_impl(e3)?;
397 let base1 = format!("{s0} < {s2}");
398 let sql1 = crate::check_ids!(e3_, base1);
399 let base2 = format!("{s0} > {s3}");
400 let sql2 = crate::check_ids!(e2_, base2);
401 Ok(format!("({sql1}) OR ({sql2})"))
402 }
403 (true, false) => {
404 let e0_ = e0.clone();
405 let e1_ = e1.clone();
406 let s0 = self.to_sql_impl(e0)?;
407 let s1 = self.to_sql_impl(e1)?;
408 let s2 = self.to_sql_impl(e2)?;
409 let base1 = format!("{s1} < {s2}");
410 let sql1 = crate::check_ids!(e0_, base1);
411 let base2 = format!("{s0} > {s2}");
412 let sql2 = crate::check_ids!(e1_, base2);
413 Ok(format!("({sql1}) OR ({sql2})"))
414 }
415 (true, true) => {
416 let e0_ = e0.clone();
417 let e1_ = e1.clone();
418 let e2_ = e2.clone();
419 let e3_ = e3.clone();
420 let s0 = self.to_sql_impl(e0)?;
421 let s1 = self.to_sql_impl(e1)?;
422 let s2 = self.to_sql_impl(e2)?;
423 let s3 = self.to_sql_impl(e3)?;
424 let base1 = format!("{s1} < {s2}");
425 let sql1 = crate::check_ids!(e0_, e3_, base1);
426 let base2 = format!("{s0} > {s3}");
427 let sql2 = crate::check_ids!(e1_, e2_, base2);
428 Ok(format!("({sql1}) OR ({sql2})"))
429 }
430 }
431 }
432
433 fn t_equals_sql(&self, a: E, b: E) -> Result<String, MyError> {
434 let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
435 match (a_is_interval, b_is_interval) {
436 (false, false) => Ok(format!(
437 "{} = {}",
438 self.to_sql_impl(e0)?,
439 self.to_sql_impl(e2)?
440 )),
441 (false, true) => Ok(format!(
442 "({0} = {1}) AND ({0} = {2})",
443 self.to_sql_impl(e0)?,
444 self.to_sql_impl(e2)?,
445 self.to_sql_impl(e3)?
446 )),
447 (true, false) => Ok(format!(
448 "({0} = {2}) AND ({1} = {2})",
449 self.to_sql_impl(e0)?,
450 self.to_sql_impl(e1)?,
451 self.to_sql_impl(e2)?
452 )),
453 (true, true) => Ok(format!(
454 "({0} = {2}) AND ({1} = {3})",
455 self.to_sql_impl(e0)?,
456 self.to_sql_impl(e1)?,
457 self.to_sql_impl(e2)?,
458 self.to_sql_impl(e3)?
459 )),
460 }
461 }
462
463 fn t_intersects_sql(&self, a: E, b: E) -> Result<String, MyError> {
464 let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
465 match (a_is_interval, b_is_interval) {
466 (false, false) => Ok(format!(
467 "{} = {}",
468 self.to_sql_impl(e0)?,
469 self.to_sql_impl(e2)?
470 )),
471 (false, true) => Ok(format!(
472 "NOT(({0} < {1}) OR ({0} > {2}))",
473 self.to_sql_impl(e0)?,
474 self.to_sql_impl(e2)?,
475 self.to_sql_impl(e3)?
476 )),
477 (true, false) => Ok(format!(
478 "NOT(({1} < {2}) OR ({0} > {2}))",
479 self.to_sql_impl(e0)?,
480 self.to_sql_impl(e1)?,
481 self.to_sql_impl(e2)?
482 )),
483 (true, true) => Ok(format!(
484 "NOT(({1} < {2}) OR ({0} > {3}))",
485 self.to_sql_impl(e0)?,
486 self.to_sql_impl(e1)?,
487 self.to_sql_impl(e2)?,
488 self.to_sql_impl(e3)?
489 )),
490 }
491 }
492
493 fn t_contains_sql(&self, a: E, b: E) -> Result<String, MyError> {
495 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
496 Ok(format!(
497 "({0} < {2}) AND ({1} > {3})",
498 self.to_sql_impl(e0)?,
499 self.to_sql_impl(e1)?,
500 self.to_sql_impl(e2)?,
501 self.to_sql_impl(e3)?
502 ))
503 }
504
505 fn t_during_sql(&self, a: E, b: E) -> Result<String, MyError> {
506 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
507 Ok(format!(
508 "({0} > {2}) AND ({1} < {3})",
509 self.to_sql_impl(e0)?,
510 self.to_sql_impl(e1)?,
511 self.to_sql_impl(e2)?,
512 self.to_sql_impl(e3)?
513 ))
514 }
515
516 fn t_finished_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
517 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
518 Ok(format!(
519 "({0} < {2}) AND ({1} = {3})",
520 self.to_sql_impl(e0)?,
521 self.to_sql_impl(e1)?,
522 self.to_sql_impl(e2)?,
523 self.to_sql_impl(e3)?
524 ))
525 }
526
527 fn t_finishes_sql(&self, a: E, b: E) -> Result<String, MyError> {
528 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
529 Ok(format!(
530 "({0} > {2}) AND ({1} = {3})",
531 self.to_sql_impl(e0)?,
532 self.to_sql_impl(e1)?,
533 self.to_sql_impl(e2)?,
534 self.to_sql_impl(e3)?
535 ))
536 }
537
538 fn t_meets_sql(&self, a: E, b: E) -> Result<String, MyError> {
539 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
540 let base = format!("{0} = {1}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
541 let sql = crate::check_ids!(e0, e3, base);
542 Ok(sql)
543 }
544
545 fn t_met_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
546 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
547 let base = format!("{0} = {1}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
548 let sql = crate::check_ids!(e1, e2, base);
549 Ok(sql)
550 }
551
552 fn t_overlapped_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
553 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
554 Ok(format!(
555 "({0} > {2}) AND ({0} < {3}) AND ({1} > {3})",
556 self.to_sql_impl(e0)?,
557 self.to_sql_impl(e1)?,
558 self.to_sql_impl(e2)?,
559 self.to_sql_impl(e3)?
560 ))
561 }
562
563 fn t_overlaps_sql(&self, a: E, b: E) -> Result<String, MyError> {
564 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
565 Ok(format!(
566 "({0} < {2}) AND ({1} > {2}) AND ({1} < {3})",
567 self.to_sql_impl(e0)?,
568 self.to_sql_impl(e1)?,
569 self.to_sql_impl(e2)?,
570 self.to_sql_impl(e3)?
571 ))
572 }
573
574 fn t_started_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
575 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
576 Ok(format!(
577 "({0} = {2}) AND ({1} > {3})",
578 self.to_sql_impl(e0)?,
579 self.to_sql_impl(e1)?,
580 self.to_sql_impl(e2)?,
581 self.to_sql_impl(e3)?
582 ))
583 }
584
585 fn t_starts_sql(&self, a: E, b: E) -> Result<String, MyError> {
586 let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
587 Ok(format!(
588 "({0} = {2}) AND ({1} < {3})",
589 self.to_sql_impl(e0)?,
590 self.to_sql_impl(e1)?,
591 self.to_sql_impl(e2)?,
592 self.to_sql_impl(e3)?
593 ))
594 }
595}
596
597fn double_quoted(s: String) -> String {
599 if s.starts_with('"') && s.ends_with('"') {
601 s
602 } else {
603 format!("\"{s}\"")
604 }
605}
606
607fn qstr_to_sql(qs: QString) -> Result<String, MyError> {
608 match qs.flags() {
609 0 => Ok(format!(r#"'{}' COLLATE "{PG_UNICODE}""#, qs.inner())),
610 1 => Ok(format!(r#"'{}' COLLATE "{CQL2_CI}""#, qs.inner())),
611 2 => Ok(format!(r#"'{}' COLLATE "{CQL2_AI}""#, qs.inner())),
612 3 => Ok(format!(r#"'{}' COLLATE "{CQL2_CAI}""#, qs.inner())),
613 x => {
614 let msg = format!("String w/ '{x}' flags has NO direct SQL representation");
615 Err(MyError::Runtime(msg.into()))
616 }
617 }
618}
619
620#[macro_export]
632macro_rules! gen_pg_ds {
633 ($vis:vis, $name:expr, $db_url:expr, $table:expr, $feature:expr) => {
634 ::paste::paste! {
635 $vis struct [<$name PG>](PGDataSource);
637
638 impl [<$name PG>] {
639 $vis async fn new() -> Result<Self, MyError> {
641 let ds = PGDataSource::from($db_url, $table).await?;
642 Ok(Self(ds))
643 }
644
645 $vis fn to_resource(r: $feature) -> Result<Resource, Box<dyn Error>> {
647 let row = $feature::try_from(r)?;
648 Ok(Resource::try_from(row)?)
649 }
650
651 $vis fn table(&self) -> &str {
653 self.0.table()
654 }
655
656 $vis fn inner(&self) -> &PGDataSource {
658 &self.0
659 }
660 }
661
662 impl ::core::fmt::Display for [<$name PG>] {
663 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
664 write!(f, "{}PG({})", $name, $table)
665 }
666 }
667
668 #[::async_trait::async_trait]
669 impl StreamableDS for [<$name PG>] {
670 type Item = $feature;
671 type Err = MyError;
672
673 async fn fetch(
674 &self
675 ) -> Result<::futures::stream::BoxStream<'_, Result<$feature, MyError>>, MyError> {
676 let sql = format!("SELECT * FROM {};", $table);
677 let safe_sql = ::sqlx::AssertSqlSafe(sql);
678 let it = sqlx::query_as::<_, $feature>(safe_sql)
679 .fetch(self.0.pool())
680 .map_err(MyError::SQL);
681 Ok(Box::pin(it))
682 }
683
684 async fn stream(
685 &self
686 ) -> Result<::futures::stream::BoxStream<'_, Result<Resource, MyError>>, MyError> {
687 let rows = self.fetch().await?;
688 let resources = rows
689 .try_filter_map(|row| async move {
690 match Resource::try_from(row) {
691 Ok(x) => Ok(Some(x)),
692 Err(x) => Err(x),
693 }
694 })
695 .boxed();
696 Ok(resources)
697 }
698
699 async fn fetch_where(
700 &self,
701 exp: &Expression,
702 ) -> Result<::futures::stream::BoxStream<'_, Result<$feature, MyError>>, MyError> {
703 let where_clause = self.0.to_sql(exp)?;
704 let sql = format!(r#"SELECT * FROM "{}" WHERE {};"#, self.table(), where_clause);
705 tracing::debug!("-- sql = {sql}");
706 let safe_sql = ::sqlx::AssertSqlSafe(sql);
707 let it = sqlx::query_as::<_, $feature>(safe_sql)
708 .fetch(self.0.pool())
709 .map_err(MyError::SQL);
710 Ok(Box::pin(it))
711 }
712
713 async fn stream_where(
714 &self,
715 exp: &Expression,
716 ) -> Result<::futures::stream::BoxStream<'_, Result<Resource, MyError>>, MyError> {
717 let rows = self.fetch_where(exp).await?;
718 let resources = rows
719 .try_filter_map(|row| async move {
720 match Resource::try_from(row) {
721 Ok(x) => Ok(Some(x)),
722 Err(x) => Err(x),
723 }
724 })
725 .boxed();
726 Ok(resources)
727 }
728 }
729 }
730 };
731}