Skip to main content

ogc_cql2/ds/
pg.rs

1// SPDX-License-Identifier: Apache-2.0
2
3#![warn(missing_docs)]
4
5//! Artifacts specific to handling geospatial data stored in a PostGIS database
6//! table.
7//!
8
9use crate::{
10    DataSource, Expression, MyError, QString, config::config, ds::sql::MIN_DATE_SQL, expr::E,
11    op::Op,
12};
13use sqlx::{
14    AssertSqlSafe, FromRow, PgPool,
15    postgres::{PgConnectOptions, PgPoolOptions},
16};
17use tracing::debug;
18
19const FIND_TABLE: &str = "SELECT * FROM geometry_columns WHERE f_table_name = $1";
20// Name of a collation that is case-insensitive. For PostgreSQL that's level 2.
21const CQL2_CI: &str = "cql2_ci";
22// Name of collation that is accent-insensitive. For PostgreSQL that's level 2.5
23const CQL2_AI: &str = "cql2_ai";
24// Name of a collation that is both case- and accent-insensitive. For
25// PostgreSQL that's level 1.
26const CQL2_CAI: &str = "cql2_ci_ai";
27// Name of PostgreSQL builtin collation that correctly orders Unicode strings
28// comparisons...
29const PG_UNICODE: &str = "pg_unicode_fast";
30
31// structure to read back a textual value.
32#[derive(Debug, FromRow)]
33struct Pragma(String);
34
35// Partial representation of a PostGIS `geometry_columns` table row.
36#[allow(dead_code)]
37#[derive(Debug, FromRow)]
38pub(crate) struct TGeometryColumn {
39    f_table_name: String,
40    f_geometry_column: String,
41    coord_dimension: i32,
42    srid: i32,
43    #[sqlx(rename = "type")]
44    type_: String,
45}
46
47/// [`DataSource`] binding a _PostGIS_ enabled database + a table name that
48/// maps rows to _Features_ and [Resources][crate::Resource].
49#[derive(Debug)]
50#[allow(dead_code)]
51pub struct PGDataSource {
52    db_name: String,
53    table: String,
54    pool: PgPool,
55    srid: u32,
56}
57
58impl DataSource for PGDataSource {
59    fn srid(&self) -> Option<u32> {
60        Some(self.srid)
61    }
62}
63
64impl PGDataSource {
65    /// Constructor.
66    pub async fn from(db_name: &str, table: &str) -> Result<Self, MyError> {
67        // concatenate server URL w/ DB name...
68        let pg_server_url = config().pg_url();
69        let url = format!("{pg_server_url}/{db_name}");
70        // parse it to start w/ a useful connection options instance...
71        let pool_opts = url
72            .parse::<PgConnectOptions>()?
73            .application_name(config().pg_appname());
74        // configure connection parameters + make a pool...
75        let pool = PgPoolOptions::new()
76            .min_connections(config().pg_min_connections())
77            .max_connections(config().pg_max_connections())
78            .acquire_timeout(config().pg_acquire_timeout())
79            .idle_timeout(config().pg_idle_timeout())
80            .max_lifetime(config().pg_max_lifetime())
81            .connect_with(pool_opts)
82            .await?;
83
84        // ensure DB has PostGIS extension installed...  do this by selecting
85        // the PostGIS_Version() function.  an OK result will suffice for now...
86        let pargma = sqlx::query_as::<_, Pragma>("SELECT PostGIS_Version();")
87            .fetch_one(&pool)
88            .await?;
89        let v = pargma.0;
90        debug!("PostGIS Version = {v}");
91
92        // ensure table exists, has a geometry column and an SRID.
93        let row = sqlx::query_as::<_, TGeometryColumn>(FIND_TABLE)
94            .bind(table)
95            .fetch_one(&pool)
96            .await?;
97        debug!("geometry_column = {row:?}");
98        // ...
99        let srid = u32::try_from(row.srid)?;
100
101        // set time zone to UTC...
102        let sql = "SET TIME ZONE 'UTC';";
103        let safe_sql = AssertSqlSafe(sql);
104        sqlx::query(safe_sql).execute(&pool).await?;
105
106        // create collations...
107        // ignore case
108        let sql = format!(
109            r#"CREATE COLLATION IF NOT EXISTS "{CQL2_CI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level2');"#
110        );
111        let safe_sql = AssertSqlSafe(sql);
112        sqlx::query(safe_sql).execute(&pool).await?;
113        // ignore accents...
114        let sql = format!(
115            r#"CREATE COLLATION IF NOT EXISTS "{CQL2_AI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level1-kc-true');"#
116        );
117        let safe_sql = AssertSqlSafe(sql);
118        sqlx::query(safe_sql).execute(&pool).await?;
119        // ignore both...
120        let sql = format!(
121            r#"CREATE COLLATION IF NOT EXISTS "{CQL2_CAI}" (provider = icu, deterministic = false, locale = 'und-u-ks-level1');"#
122        );
123        let safe_sql = AssertSqlSafe(sql);
124        sqlx::query(safe_sql).execute(&pool).await?;
125
126        Ok(Self {
127            db_name: db_name.to_owned(),
128            table: table.to_owned(),
129            pool,
130            srid,
131        })
132    }
133
134    /// Return this pool.
135    pub fn pool(&self) -> &PgPool {
136        &self.pool
137    }
138
139    /// Return name of the table housing the data.
140    pub fn table(&self) -> &str {
141        &self.table
142    }
143
144    /// Transform given [Expression] to an SQL _WHERE_ clause that can be used
145    /// for selecting a subset of this data source items.
146    pub fn to_sql(&self, exp: &Expression) -> Result<String, MyError> {
147        let mut e = exp.to_inner()?;
148        let reduced = E::reduce(&mut e)?;
149        self.to_sql_impl(reduced)
150    }
151
152    fn to_sql_impl(&self, exp: E) -> Result<String, MyError> {
153        match exp {
154            E::Null => Ok("NULL".to_owned()),
155            E::Unbounded => Ok(MIN_DATE_SQL.to_owned()),
156            E::Bool(true) => Ok("TRUE".to_owned()),
157            E::Bool(false) => Ok("FALSE".to_owned()),
158            E::Num(x) => Ok(x.to_string()),
159            E::Str(x) => qstr_to_sql(x),
160            E::Date(x) => Ok(format!("'{}'", x.date())),
161            E::Timestamp(x) => Ok(format!("'{}'", x.datetime())),
162            E::Spatial(x) => Ok(x.to_sql()?),
163            E::Id(x) => Ok(double_quoted(x)),
164            // some work need to be done when handling these options...
165            E::Monadic(op, x) if op.nullable() => {
166                let is_literal = x.is_literal_or_id();
167                let lhs = self.to_sql_impl(*x)?;
168                let z_op = op.to_sql();
169                if is_literal {
170                    Ok(format!("{lhs} {z_op}"))
171                } else {
172                    Ok(format!("({lhs}) {z_op}"))
173                }
174            }
175            E::Monadic(op, x) => match op {
176                Op::Neg | Op::Minus => {
177                    let is_literal = x.is_literal_or_id();
178                    let rhs = self.to_sql_impl(*x)?;
179                    let z_op = op.to_sql();
180                    if is_literal {
181                        Ok(format!("{z_op} {rhs}"))
182                    } else {
183                        Ok(format!("{z_op} ({rhs})"))
184                    }
185                }
186                Op::CaseI => match *x {
187                    E::Monadic(Op::AccentI, y) => {
188                        let rhs = self.to_sql_impl(*y)?;
189                        Ok(format!("{rhs} COLLATE {CQL2_CAI}"))
190                    }
191                    _ => {
192                        let rhs = self.to_sql_impl(*x)?;
193                        Ok(format!("{rhs} COLLATE {CQL2_CI}"))
194                    }
195                },
196                Op::AccentI => match *x {
197                    E::Monadic(Op::CaseI, y) => {
198                        let rhs = self.to_sql_impl(*y)?;
199                        Ok(format!("{rhs} COLLATE {CQL2_CAI}"))
200                    }
201                    _ => {
202                        let rhs = self.to_sql_impl(*x)?;
203                        Ok(format!("{rhs} COLLATE {CQL2_AI}"))
204                    }
205                },
206                x => unreachable!("Unexpected ({x}) monadic operator"),
207            },
208            E::Dyadic(op, a, b)
209                if matches!(op, Op::IsBetween) || matches!(op, Op::IsNotBetween) =>
210            {
211                // RHS of [NOT] BETWEEN is an array of 2 numeric expressions...
212                match *b {
213                    E::Array(rhs) => {
214                        let z_op = op.to_sql();
215                        let lhs = self.to_sql_impl(*a)?;
216                        let lo = self.to_sql_impl(rhs[0].to_owned())?;
217                        let hi = self.to_sql_impl(rhs[1].to_owned())?;
218                        Ok(format!("{lhs} {z_op} {lo} AND {hi}"))
219                    }
220                    _ => unreachable!("Expetced [NOT] BETWEEN's RHS expression to be an array"),
221                }
222            }
223            E::Dyadic(op, a, b) if op.spatial() => match op {
224                Op::SWithin | Op::SOverlaps | Op::STouches => self.reduce_precision(op, *a, *b),
225                _ => {
226                    let lhs = self.to_sql_impl(*a)?;
227                    let rhs = self.to_sql_impl(*b)?;
228                    let z_op = op.to_sql();
229                    Ok(format!("{z_op}({lhs}, {rhs})"))
230                }
231            },
232            E::Dyadic(op, a, b) if op.temporal() => match op {
233                Op::TAfter => self.t_after_sql(*a, *b),
234                Op::TBefore => self.t_before_sql(*a, *b),
235                Op::TDisjoint => self.t_disjoint_sql(*a, *b),
236                Op::TEquals => self.t_equals_sql(*a, *b),
237                Op::TIntersects => self.t_intersects_sql(*a, *b),
238
239                Op::TContains => self.t_contains_sql(*a, *b),
240                Op::TDuring => self.t_during_sql(*a, *b),
241                Op::TFinishedBy => self.t_finished_by_sql(*a, *b),
242                Op::TFinishes => self.t_finishes_sql(*a, *b),
243                Op::TMeets => self.t_meets_sql(*a, *b),
244                Op::TMetBy => self.t_met_by_sql(*a, *b),
245                Op::TOverlappedBy => self.t_overlapped_by_sql(*a, *b),
246                Op::TOverlaps => self.t_overlaps_sql(*a, *b),
247                Op::TStartedBy => self.t_started_by_sql(*a, *b),
248                Op::TStarts => self.t_starts_sql(*a, *b),
249                x => unreachable!("Unexpected ({x:?}) operator"),
250            },
251            E::Dyadic(op, a, b) if op.array() => {
252                let z_op = op.to_sql();
253                let lhs = self.to_sql_impl(*a)?;
254                let rhs = self.to_sql_impl(*b)?;
255                Ok(format!("{lhs} {z_op} {rhs}"))
256            }
257            E::Dyadic(op, a, b) if matches!(op, Op::IsLike) || matches!(op, Op::IsNotLike) => {
258                let a_is_literal = a.is_literal_or_id();
259                let lhs = self.to_sql_impl(*a)?;
260                let rhs = self.to_sql_impl(*b)?;
261                let z_op = op.to_sql();
262                match a_is_literal {
263                    true => Ok(format!("{lhs} {z_op} ({rhs})")),
264                    false => Ok(format!("({lhs}) {z_op} ({rhs})")),
265                }
266            }
267            E::Dyadic(op, a, b) => {
268                let a_is_literal = a.is_literal_or_id();
269                let b_is_literal = b.is_literal_or_id();
270                let lhs = self.to_sql_impl(*a)?;
271                let rhs = self.to_sql_impl(*b)?;
272                let z_op = op.to_sql();
273                match (a_is_literal, b_is_literal) {
274                    (true, true) => Ok(format!("{lhs} {z_op} {rhs}")),
275                    (true, false) => Ok(format!("{lhs} {z_op} ({rhs})")),
276                    (false, true) => Ok(format!("({lhs}) {z_op} {rhs}")),
277                    (false, false) => Ok(format!("({lhs}) {z_op} ({rhs})")),
278                }
279            }
280            E::Function(x) => {
281                let params: Result<Vec<String>, MyError> =
282                    x.params.into_iter().map(|x| self.to_sql_impl(x)).collect();
283                let params_ = params?;
284                Ok(format!("{}({})", x.name, params_.join(", ")))
285            }
286            E::Array(x) => {
287                let items: Result<Vec<String>, MyError> =
288                    x.into_iter().map(|x| self.to_sql_impl(x)).collect();
289                let items_ = items?;
290                Ok(format!("({})", items_.join(", ")))
291            }
292            x => unreachable!("{x:?} cannot be transformed to SQL"),
293        }
294    }
295
296    fn reduce_precision(&self, op: Op, a: E, b: E) -> Result<String, MyError> {
297        let a_is_id = a.is_id();
298        let b_is_id = b.is_id();
299        let (lhs, rhs) = match (a_is_id, b_is_id) {
300            (true, false) => {
301                let lhs = self.reduce_precision_sql(a)?;
302                let rhs = self.to_sql_impl(b)?;
303                (lhs, rhs)
304            }
305            (false, true) => {
306                let lhs = self.to_sql_impl(a)?;
307                let rhs = self.reduce_precision_sql(b)?;
308                (lhs, rhs)
309            }
310            _ => {
311                let lhs = self.to_sql_impl(a)?;
312                let rhs = self.to_sql_impl(b)?;
313                (lhs, rhs)
314            }
315        };
316        let z_op = op.to_sql();
317        Ok(format!("{z_op}({lhs}, {rhs})"))
318    }
319
320    fn reduce_precision_sql(&self, a: E) -> Result<String, MyError> {
321        let it = format!(
322            "ST_ReducePrecision({}, 1E-{})",
323            self.to_sql_impl(a)?,
324            config().default_precision()
325        );
326        Ok(it)
327    }
328
329    // mixed (instant and interval) arguments...
330    fn t_after_sql(&self, a: E, b: E) -> Result<String, MyError> {
331        let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
332        match (a_is_interval, b_is_interval) {
333            (false, false) => Ok(format!(
334                "{} > {}",
335                self.to_sql_impl(e0)?,
336                self.to_sql_impl(e2)?
337            )),
338            // w/ the remaining cases, we may need additional xxx IS NOT NULL fragments...
339            (false, true) => {
340                let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
341                let sql = crate::check_ids!(e2, base);
342                Ok(sql)
343            }
344            (true, false) => {
345                let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e2)?);
346                let sql = crate::check_ids!(e1, base);
347                Ok(sql)
348            }
349            (true, true) => {
350                let base = format!("{} > {}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
351                let sql = crate::check_ids!(e1, e2, base);
352                Ok(sql)
353            }
354        }
355    }
356
357    fn t_before_sql(&self, a: E, b: E) -> Result<String, MyError> {
358        let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
359        match (a_is_interval, b_is_interval) {
360            (false, false) => Ok(format!(
361                "{} < {}",
362                self.to_sql_impl(e0)?,
363                self.to_sql_impl(e2)?
364            )),
365            (false, true) => {
366                let base = format!("{} < {}", self.to_sql_impl(e0)?, self.to_sql_impl(e2)?);
367                let sql = crate::check_ids!(e3, base);
368                Ok(sql)
369            }
370            (true, false) => {
371                let base = format!("{} < {}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
372                let sql = crate::check_ids!(e0, base);
373                Ok(sql)
374            }
375            (true, true) => {
376                let base = format!("{} < {}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
377                let sql = crate::check_ids!(e0, e3, base);
378                Ok(sql)
379            }
380        }
381    }
382
383    fn t_disjoint_sql(&self, a: E, b: E) -> Result<String, MyError> {
384        let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
385        match (a_is_interval, b_is_interval) {
386            (false, false) => Ok(format!(
387                "{} != {}",
388                self.to_sql_impl(e0)?,
389                self.to_sql_impl(e2)?
390            )),
391            (false, true) => {
392                let e2_ = e2.clone();
393                let e3_ = e3.clone();
394                let s0 = self.to_sql_impl(e0)?;
395                let s2 = self.to_sql_impl(e2)?;
396                let s3 = self.to_sql_impl(e3)?;
397                let base1 = format!("{s0} < {s2}");
398                let sql1 = crate::check_ids!(e3_, base1);
399                let base2 = format!("{s0} > {s3}");
400                let sql2 = crate::check_ids!(e2_, base2);
401                Ok(format!("({sql1}) OR ({sql2})"))
402            }
403            (true, false) => {
404                let e0_ = e0.clone();
405                let e1_ = e1.clone();
406                let s0 = self.to_sql_impl(e0)?;
407                let s1 = self.to_sql_impl(e1)?;
408                let s2 = self.to_sql_impl(e2)?;
409                let base1 = format!("{s1} < {s2}");
410                let sql1 = crate::check_ids!(e0_, base1);
411                let base2 = format!("{s0} > {s2}");
412                let sql2 = crate::check_ids!(e1_, base2);
413                Ok(format!("({sql1}) OR ({sql2})"))
414            }
415            (true, true) => {
416                let e0_ = e0.clone();
417                let e1_ = e1.clone();
418                let e2_ = e2.clone();
419                let e3_ = e3.clone();
420                let s0 = self.to_sql_impl(e0)?;
421                let s1 = self.to_sql_impl(e1)?;
422                let s2 = self.to_sql_impl(e2)?;
423                let s3 = self.to_sql_impl(e3)?;
424                let base1 = format!("{s1} < {s2}");
425                let sql1 = crate::check_ids!(e0_, e3_, base1);
426                let base2 = format!("{s0} > {s3}");
427                let sql2 = crate::check_ids!(e1_, e2_, base2);
428                Ok(format!("({sql1}) OR ({sql2})"))
429            }
430        }
431    }
432
433    fn t_equals_sql(&self, a: E, b: E) -> Result<String, MyError> {
434        let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
435        match (a_is_interval, b_is_interval) {
436            (false, false) => Ok(format!(
437                "{} = {}",
438                self.to_sql_impl(e0)?,
439                self.to_sql_impl(e2)?
440            )),
441            (false, true) => Ok(format!(
442                "({0} = {1}) AND ({0} = {2})",
443                self.to_sql_impl(e0)?,
444                self.to_sql_impl(e2)?,
445                self.to_sql_impl(e3)?
446            )),
447            (true, false) => Ok(format!(
448                "({0} = {2}) AND ({1} = {2})",
449                self.to_sql_impl(e0)?,
450                self.to_sql_impl(e1)?,
451                self.to_sql_impl(e2)?
452            )),
453            (true, true) => Ok(format!(
454                "({0} = {2}) AND ({1} = {3})",
455                self.to_sql_impl(e0)?,
456                self.to_sql_impl(e1)?,
457                self.to_sql_impl(e2)?,
458                self.to_sql_impl(e3)?
459            )),
460        }
461    }
462
463    fn t_intersects_sql(&self, a: E, b: E) -> Result<String, MyError> {
464        let (a_is_interval, b_is_interval, e0, e1, e2, e3) = crate::unfold_expressions!(a, b);
465        match (a_is_interval, b_is_interval) {
466            (false, false) => Ok(format!(
467                "{} = {}",
468                self.to_sql_impl(e0)?,
469                self.to_sql_impl(e2)?
470            )),
471            (false, true) => Ok(format!(
472                "NOT(({0} < {1}) OR ({0} > {2}))",
473                self.to_sql_impl(e0)?,
474                self.to_sql_impl(e2)?,
475                self.to_sql_impl(e3)?
476            )),
477            (true, false) => Ok(format!(
478                "NOT(({1} < {2}) OR ({0} > {2}))",
479                self.to_sql_impl(e0)?,
480                self.to_sql_impl(e1)?,
481                self.to_sql_impl(e2)?
482            )),
483            (true, true) => Ok(format!(
484                "NOT(({1} < {2}) OR ({0} > {3}))",
485                self.to_sql_impl(e0)?,
486                self.to_sql_impl(e1)?,
487                self.to_sql_impl(e2)?,
488                self.to_sql_impl(e3)?
489            )),
490        }
491    }
492
493    // intervals only...
494    fn t_contains_sql(&self, a: E, b: E) -> Result<String, MyError> {
495        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
496        Ok(format!(
497            "({0} < {2}) AND ({1} > {3})",
498            self.to_sql_impl(e0)?,
499            self.to_sql_impl(e1)?,
500            self.to_sql_impl(e2)?,
501            self.to_sql_impl(e3)?
502        ))
503    }
504
505    fn t_during_sql(&self, a: E, b: E) -> Result<String, MyError> {
506        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
507        Ok(format!(
508            "({0} > {2}) AND ({1} < {3})",
509            self.to_sql_impl(e0)?,
510            self.to_sql_impl(e1)?,
511            self.to_sql_impl(e2)?,
512            self.to_sql_impl(e3)?
513        ))
514    }
515
516    fn t_finished_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
517        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
518        Ok(format!(
519            "({0} < {2}) AND ({1} = {3})",
520            self.to_sql_impl(e0)?,
521            self.to_sql_impl(e1)?,
522            self.to_sql_impl(e2)?,
523            self.to_sql_impl(e3)?
524        ))
525    }
526
527    fn t_finishes_sql(&self, a: E, b: E) -> Result<String, MyError> {
528        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
529        Ok(format!(
530            "({0} > {2}) AND ({1} = {3})",
531            self.to_sql_impl(e0)?,
532            self.to_sql_impl(e1)?,
533            self.to_sql_impl(e2)?,
534            self.to_sql_impl(e3)?
535        ))
536    }
537
538    fn t_meets_sql(&self, a: E, b: E) -> Result<String, MyError> {
539        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
540        let base = format!("{0} = {1}", self.to_sql_impl(e1)?, self.to_sql_impl(e2)?);
541        let sql = crate::check_ids!(e0, e3, base);
542        Ok(sql)
543    }
544
545    fn t_met_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
546        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
547        let base = format!("{0} = {1}", self.to_sql_impl(e0)?, self.to_sql_impl(e3)?);
548        let sql = crate::check_ids!(e1, e2, base);
549        Ok(sql)
550    }
551
552    fn t_overlapped_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
553        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
554        Ok(format!(
555            "({0} > {2}) AND ({0} < {3}) AND ({1} > {3})",
556            self.to_sql_impl(e0)?,
557            self.to_sql_impl(e1)?,
558            self.to_sql_impl(e2)?,
559            self.to_sql_impl(e3)?
560        ))
561    }
562
563    fn t_overlaps_sql(&self, a: E, b: E) -> Result<String, MyError> {
564        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
565        Ok(format!(
566            "({0} < {2}) AND ({1} > {2}) AND ({1} < {3})",
567            self.to_sql_impl(e0)?,
568            self.to_sql_impl(e1)?,
569            self.to_sql_impl(e2)?,
570            self.to_sql_impl(e3)?
571        ))
572    }
573
574    fn t_started_by_sql(&self, a: E, b: E) -> Result<String, MyError> {
575        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
576        Ok(format!(
577            "({0} = {2}) AND ({1} > {3})",
578            self.to_sql_impl(e0)?,
579            self.to_sql_impl(e1)?,
580            self.to_sql_impl(e2)?,
581            self.to_sql_impl(e3)?
582        ))
583    }
584
585    fn t_starts_sql(&self, a: E, b: E) -> Result<String, MyError> {
586        let (e0, e1, e2, e3) = crate::unfold_intervals!(a, b);
587        Ok(format!(
588            "({0} = {2}) AND ({1} < {3})",
589            self.to_sql_impl(e0)?,
590            self.to_sql_impl(e1)?,
591            self.to_sql_impl(e2)?,
592            self.to_sql_impl(e3)?
593        ))
594    }
595}
596
597/// Render a given string as surrounded by double-quotes unless it already is.
598fn double_quoted(s: String) -> String {
599    // if already surrounded by double-quotes, return as is...
600    if s.starts_with('"') && s.ends_with('"') {
601        s
602    } else {
603        format!("\"{s}\"")
604    }
605}
606
607fn qstr_to_sql(qs: QString) -> Result<String, MyError> {
608    match qs.flags() {
609        0 => Ok(format!(r#"'{}' COLLATE "{PG_UNICODE}""#, qs.inner())),
610        1 => Ok(format!(r#"'{}' COLLATE "{CQL2_CI}""#, qs.inner())),
611        2 => Ok(format!(r#"'{}' COLLATE "{CQL2_AI}""#, qs.inner())),
612        3 => Ok(format!(r#"'{}' COLLATE "{CQL2_CAI}""#, qs.inner())),
613        x => {
614            let msg = format!("String w/ '{x}' flags has NO direct SQL representation");
615            Err(MyError::Runtime(msg.into()))
616        }
617    }
618}
619
620/// Macro to generate a concrete [PGDataSource].
621///
622/// Caller must provide the following parameters:
623/// * `$vis`: Visibility specifier of the generated artifacts; e.g. `pub(crate)`.
624/// * `$name`: Prefix of the concrete data source structure name to materialize.
625///   The final name will have a 'PG' suffix appended; eg. `Foo` -> `FooPG`.
626/// * `$db_url`: Database URL to an accessible _PostgreSQL_ DB; e.g.
627///   `postgres:user:password@host:port/db_name`
628/// * `$table`: Name of the table containing the features' data.
629/// * `$feature`: `sqlx` _FromRow_ convertible structure to map database table
630///   rows to _Features_.
631#[macro_export]
632macro_rules! gen_pg_ds {
633    ($vis:vis, $name:expr, $db_url:expr, $table:expr, $feature:expr) => {
634        ::paste::paste! {
635            /// Concrete PostgreSQL+PostGIS source.
636            $vis struct [<$name PG>](PGDataSource);
637
638            impl [<$name PG>] {
639                /// Constructor.
640                $vis async fn new() -> Result<Self, MyError> {
641                    let ds = PGDataSource::from($db_url, $table).await?;
642                    Ok(Self(ds))
643                }
644
645                /// Convert a row (aka Feature) to a generic Resource.
646                $vis fn to_resource(r: $feature) -> Result<Resource, Box<dyn Error>> {
647                    let row = $feature::try_from(r)?;
648                    Ok(Resource::try_from(row)?)
649                }
650
651                /// Convenience method. Calls inner's samilarly named method.
652                $vis fn table(&self) -> &str {
653                    self.0.table()
654                }
655
656                /// Return a reference to the inner model data source.
657                $vis fn inner(&self) -> &PGDataSource {
658                    &self.0
659                }
660            }
661
662            impl ::core::fmt::Display for [<$name PG>] {
663                fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
664                    write!(f, "{}PG({})", $name, $table)
665                }
666            }
667
668            #[::async_trait::async_trait]
669            impl StreamableDS for [<$name PG>] {
670                type Item = $feature;
671                type Err = MyError;
672
673                async fn fetch(
674                    &self
675                ) -> Result<::futures::stream::BoxStream<'_, Result<$feature, MyError>>, MyError> {
676                    let sql = format!("SELECT * FROM {};", $table);
677                    let safe_sql = ::sqlx::AssertSqlSafe(sql);
678                    let it = sqlx::query_as::<_, $feature>(safe_sql)
679                        .fetch(self.0.pool())
680                        .map_err(MyError::SQL);
681                    Ok(Box::pin(it))
682                }
683
684                async fn stream(
685                    &self
686                ) -> Result<::futures::stream::BoxStream<'_, Result<Resource, MyError>>, MyError> {
687                    let rows = self.fetch().await?;
688                    let resources = rows
689                        .try_filter_map(|row| async move {
690                            match Resource::try_from(row) {
691                                Ok(x) => Ok(Some(x)),
692                                Err(x) => Err(x),
693                            }
694                        })
695                        .boxed();
696                    Ok(resources)
697                }
698
699                async fn fetch_where(
700                    &self,
701                    exp: &Expression,
702                ) -> Result<::futures::stream::BoxStream<'_, Result<$feature, MyError>>, MyError> {
703                    let where_clause = self.0.to_sql(exp)?;
704                    let sql = format!(r#"SELECT * FROM "{}" WHERE {};"#, self.table(), where_clause);
705                    tracing::debug!("-- sql = {sql}");
706                    let safe_sql = ::sqlx::AssertSqlSafe(sql);
707                    let it = sqlx::query_as::<_, $feature>(safe_sql)
708                        .fetch(self.0.pool())
709                        .map_err(MyError::SQL);
710                    Ok(Box::pin(it))
711                }
712
713                async fn stream_where(
714                    &self,
715                    exp: &Expression,
716                ) -> Result<::futures::stream::BoxStream<'_, Result<Resource, MyError>>, MyError> {
717                    let rows = self.fetch_where(exp).await?;
718                    let resources = rows
719                        .try_filter_map(|row| async move {
720                            match Resource::try_from(row) {
721                                Ok(x) => Ok(Some(x)),
722                                Err(x) => Err(x),
723                            }
724                        })
725                        .boxed();
726                    Ok(resources)
727                }
728            }
729        }
730    };
731}