Skip to main content

feldera_types/
program_schema.rs

1pub use feldera_ir::SourcePosition;
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5use std::fmt::Display;
6use std::hash::{Hash, Hasher};
7use utoipa::ToSchema;
8use utoipa::openapi::{ObjectBuilder, RefOr, Schema, SchemaType};
9
10#[cfg(feature = "testing")]
11use proptest::{collection::vec, prelude::any};
12
13/// Returns canonical form of a SQL identifier:
14///
15/// - If id is _not_ quoted, then it is interpreted as a case-insensitive
16///   identifier and is converted to the lowercase representation
17/// - If id _is_ quoted, then it is a case-sensitive identifier and is returned
18///   as is, without quotes. No other processing is done on the inner string,
19///   e.g., un-escaping quotes.
20pub fn canonical_identifier(id: &str) -> String {
21    if id.starts_with('"') && id.ends_with('"') && id.len() >= 2 {
22        id[1..id.len() - 1].to_string()
23    } else {
24        id.to_lowercase()
25    }
26}
27
28/// An SQL identifier.
29///
30/// This struct is used to represent SQL identifiers in a canonical form.
31/// We store table names or field names as identifiers in the schema.
32#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
33#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
34pub struct SqlIdentifier {
35    #[cfg_attr(feature = "testing", proptest(regex = "relation1|relation2|relation3"))]
36    name: String,
37    pub case_sensitive: bool,
38}
39
40impl SqlIdentifier {
41    pub fn new<S: AsRef<str>>(name: S, case_sensitive: bool) -> Self {
42        Self {
43            name: name.as_ref().to_string(),
44            case_sensitive,
45        }
46    }
47
48    /// Return the name of the identifier in canonical form.
49    /// The result is the true case-sensitive identifying name of the table,
50    /// and can be used for example to detect duplicate table names.
51    ///
52    /// Example return values for this function:
53    /// - `CREATE TABLE t1` -> `t1`
54    /// - `CREATE TABLE T1` -> `t1`
55    /// - `CREATE TABLE "t1"` -> `t1`
56    /// - `CREATE TABLE "T1"` -> `T1`
57    pub fn name(&self) -> String {
58        if self.case_sensitive {
59            self.name.clone()
60        } else {
61            self.name.to_lowercase()
62        }
63    }
64
65    /// Return the name of the identifier as it appeared originally in SQL.
66    /// This method should only be used for log or error messages as it is what
67    /// the user originally wrote, however it should not be used for identification
68    /// or disambiguation (use `name()` for that instead).
69    ///
70    /// Example return values for this function:
71    /// - `CREATE TABLE t1` -> `t1`
72    /// - `CREATE TABLE T1` -> `T1`
73    /// - `CREATE TABLE "t1"` -> `"t1"`
74    /// - `CREATE TABLE "T1"` -> `"T1"`
75    pub fn sql_name(&self) -> String {
76        if self.case_sensitive {
77            format!("\"{}\"", self.name)
78        } else {
79            self.name.clone()
80        }
81    }
82}
83
84impl Hash for SqlIdentifier {
85    fn hash<H: Hasher>(&self, state: &mut H) {
86        self.name().hash(state);
87    }
88}
89
90impl PartialEq for SqlIdentifier {
91    fn eq(&self, other: &Self) -> bool {
92        match (self.case_sensitive, other.case_sensitive) {
93            (true, true) => self.name == other.name,
94            (false, false) => self.name.to_lowercase() == other.name.to_lowercase(),
95            (true, false) => self.name == other.name,
96            (false, true) => self.name == other.name,
97        }
98    }
99}
100
101impl Ord for SqlIdentifier {
102    fn cmp(&self, other: &Self) -> Ordering {
103        self.name().cmp(&other.name())
104    }
105}
106
107impl PartialOrd for SqlIdentifier {
108    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109        Some(self.cmp(other))
110    }
111}
112
113impl<S: AsRef<str>> PartialEq<S> for SqlIdentifier {
114    fn eq(&self, other: &S) -> bool {
115        self == &SqlIdentifier::from(other.as_ref())
116    }
117}
118
119impl Eq for SqlIdentifier {}
120
121impl<S: AsRef<str>> From<S> for SqlIdentifier {
122    fn from(name: S) -> Self {
123        if name.as_ref().starts_with('"')
124            && name.as_ref().ends_with('"')
125            && name.as_ref().len() >= 2
126        {
127            Self {
128                name: name.as_ref()[1..name.as_ref().len() - 1].to_string(),
129                case_sensitive: true,
130            }
131        } else {
132            Self::new(name, false)
133        }
134    }
135}
136
137impl From<SqlIdentifier> for String {
138    fn from(id: SqlIdentifier) -> String {
139        id.name()
140    }
141}
142
143impl From<&SqlIdentifier> for String {
144    fn from(id: &SqlIdentifier) -> String {
145        id.name()
146    }
147}
148
149impl Display for SqlIdentifier {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        write!(f, "{}", self.name())
152    }
153}
154
155/// A struct containing the tables (inputs) and views for a program.
156///
157/// Parse from the JSON data-type of the DDL generated by the SQL compiler.
158#[derive(Default, Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
159#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
160pub struct ProgramSchema {
161    #[cfg_attr(
162        feature = "testing",
163        proptest(strategy = "vec(any::<Relation>(), 0..2)")
164    )]
165    pub inputs: Vec<Relation>,
166    #[cfg_attr(
167        feature = "testing",
168        proptest(strategy = "vec(any::<Relation>(), 0..2)")
169    )]
170    pub outputs: Vec<Relation>,
171}
172
173impl ProgramSchema {
174    pub fn relations_with_lateness(&self) -> Vec<SqlIdentifier> {
175        self.inputs
176            .iter()
177            .chain(self.outputs.iter())
178            .filter(|rel| rel.has_lateness())
179            .map(|rel| rel.name.clone())
180            .collect()
181    }
182}
183
184/// A version of `ProgramSchema` that contains only the name and properties of the relations
185/// by making use of `RelationPropertiesOnly` instead of `Relation` for its inputs and outputs.
186/// This is used to avoid parsing the entire `Relation` object, including SQL schema,
187/// which can change across runtime versions.
188#[derive(Debug, Deserialize)]
189pub struct ProgramSchemaPropertiesOnly {
190    #[serde(default)]
191    pub inputs: Vec<RelationPropertiesOnly>,
192    #[serde(default)]
193    pub outputs: Vec<RelationPropertiesOnly>,
194}
195
196#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
197#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
198pub struct PropertyValue {
199    pub value: String,
200    pub key_position: SourcePosition,
201    pub value_position: SourcePosition,
202}
203
204/// A SQL table or view. It has a name and a list of fields.
205///
206/// Matches the Calcite JSON format.
207#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
208#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
209pub struct Relation {
210    #[serde(flatten)]
211    pub name: SqlIdentifier,
212    #[cfg_attr(feature = "testing", proptest(value = "Vec::new()"))]
213    pub fields: Vec<Field>,
214    #[serde(default)]
215    pub materialized: bool,
216    #[serde(default)]
217    pub properties: BTreeMap<String, PropertyValue>,
218    pub primary_key: Option<Vec<String>>,
219}
220
221impl Relation {
222    pub fn empty() -> Self {
223        Self {
224            name: SqlIdentifier::from("".to_string()),
225            fields: Vec::new(),
226            materialized: false,
227            properties: BTreeMap::new(),
228            primary_key: None,
229        }
230    }
231
232    pub fn new(
233        name: SqlIdentifier,
234        fields: Vec<Field>,
235        materialized: bool,
236        properties: BTreeMap<String, PropertyValue>,
237    ) -> Self {
238        Self {
239            name,
240            fields,
241            materialized,
242            properties,
243            primary_key: None,
244        }
245    }
246
247    /// Lookup field by name.
248    pub fn field(&self, name: &str) -> Option<&Field> {
249        let name = canonical_identifier(name);
250        self.fields.iter().find(|f| f.name == name)
251    }
252
253    pub fn has_lateness(&self) -> bool {
254        self.fields.iter().any(|f| f.lateness.is_some())
255    }
256
257    pub fn get_property(&self, name: &str) -> Option<&str> {
258        self.properties.get(name).map(|p| p.value.as_str())
259    }
260
261    pub fn with_primary_key<'a>(
262        mut self,
263        primary_key: impl IntoIterator<Item = &'a SqlIdentifier>,
264    ) -> Self {
265        self.primary_key = Some(primary_key.into_iter().map(|id| id.name()).collect());
266        self
267    }
268}
269
270/// A version of `Relation` that only contains the name and properties.
271/// This is used to avoid parsing the entire `Relation` object, including
272/// SQL schema, which can change across runtime versions.
273#[derive(Debug, Deserialize)]
274pub struct RelationPropertiesOnly {
275    #[serde(flatten)]
276    pub name: SqlIdentifier,
277    #[serde(default)]
278    pub properties: BTreeMap<String, PropertyValue>,
279}
280
281/// A SQL field.
282///
283/// Matches the SQL compiler JSON format.
284#[derive(Serialize, ToSchema, Debug, Eq, PartialEq, Clone)]
285#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
286pub struct Field {
287    #[serde(flatten)]
288    pub name: SqlIdentifier,
289    pub columntype: ColumnType,
290    pub lateness: Option<String>,
291    pub default: Option<String>,
292    pub unused: bool,
293    pub watermark: Option<String>,
294}
295
296impl Field {
297    pub fn new(name: SqlIdentifier, columntype: ColumnType) -> Self {
298        Self {
299            name,
300            columntype,
301            lateness: None,
302            default: None,
303            unused: false,
304            watermark: None,
305        }
306    }
307
308    pub fn with_lateness(mut self, lateness: &str) -> Self {
309        self.lateness = Some(lateness.to_string());
310        self
311    }
312
313    pub fn with_unused(mut self, unused: bool) -> Self {
314        self.unused = unused;
315        self
316    }
317}
318
319/// Thanks to the brain-dead Calcite schema, if we are deserializing a field, the type options
320/// end up inside the Field struct.
321///
322/// This helper struct is used to deserialize the Field struct.
323impl<'de> Deserialize<'de> for Field {
324    fn deserialize<D>(deserializer: D) -> Result<Field, D::Error>
325    where
326        D: Deserializer<'de>,
327    {
328        const fn default_is_struct() -> Option<SqlType> {
329            Some(SqlType::Struct)
330        }
331
332        #[derive(Debug, Clone, Deserialize)]
333        struct FieldHelper {
334            name: Option<String>,
335            #[serde(default)]
336            case_sensitive: bool,
337            columntype: Option<ColumnType>,
338            #[serde(rename = "type")]
339            #[serde(default = "default_is_struct")]
340            typ: Option<SqlType>,
341            nullable: Option<bool>,
342            precision: Option<i64>,
343            scale: Option<i64>,
344            component: Option<Box<ColumnType>>,
345            fields: Option<serde_json::Value>,
346            key: Option<Box<ColumnType>>,
347            value: Option<Box<ColumnType>>,
348            default: Option<String>,
349            #[serde(default)]
350            unused: bool,
351            lateness: Option<String>,
352            watermark: Option<String>,
353        }
354
355        fn helper_to_field(helper: FieldHelper) -> Field {
356            let columntype = if let Some(ctype) = helper.columntype {
357                ctype
358            } else if let Some(serde_json::Value::Array(fields)) = helper.fields {
359                let fields = fields
360                    .into_iter()
361                    .map(|field| {
362                        let field: FieldHelper = serde_json::from_value(field).unwrap();
363                        helper_to_field(field)
364                    })
365                    .collect::<Vec<Field>>();
366
367                ColumnType {
368                    typ: helper.typ.unwrap_or(SqlType::Null),
369                    nullable: helper.nullable.unwrap_or(false),
370                    precision: helper.precision,
371                    scale: helper.scale,
372                    component: helper.component,
373                    fields: Some(fields),
374                    key: None,
375                    value: None,
376                }
377            } else if let Some(serde_json::Value::Object(obj)) = helper.fields {
378                serde_json::from_value(serde_json::Value::Object(obj))
379                    .expect("Failed to deserialize object")
380            } else {
381                ColumnType {
382                    typ: helper.typ.unwrap_or(SqlType::Null),
383                    nullable: helper.nullable.unwrap_or(false),
384                    precision: helper.precision,
385                    scale: helper.scale,
386                    component: helper.component,
387                    fields: None,
388                    key: helper.key,
389                    value: helper.value,
390                }
391            };
392
393            Field {
394                name: SqlIdentifier::new(helper.name.unwrap(), helper.case_sensitive),
395                columntype,
396                default: helper.default,
397                unused: helper.unused,
398                lateness: helper.lateness,
399                watermark: helper.watermark,
400            }
401        }
402
403        let helper = FieldHelper::deserialize(deserializer)?;
404        Ok(helper_to_field(helper))
405    }
406}
407
408/// The specified units for SQL Interval types.
409///
410/// `INTERVAL 1 DAY`, `INTERVAL 1 DAY TO HOUR`, `INTERVAL 1 DAY TO MINUTE`,
411/// would yield `Day`, `DayToHour`, `DayToMinute`, as the `IntervalUnit` respectively.
412#[derive(ToSchema, Debug, Eq, PartialEq, Clone, Copy)]
413#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
414pub enum IntervalUnit {
415    /// Unit for `INTERVAL ... DAY`.
416    Day,
417    /// Unit for `INTERVAL ... DAY TO HOUR`.
418    DayToHour,
419    /// Unit for `INTERVAL ... DAY TO MINUTE`.
420    DayToMinute,
421    /// Unit for `INTERVAL ... DAY TO SECOND`.
422    DayToSecond,
423    /// Unit for `INTERVAL ... HOUR`.
424    Hour,
425    /// Unit for `INTERVAL ... HOUR TO MINUTE`.
426    HourToMinute,
427    /// Unit for `INTERVAL ... HOUR TO SECOND`.
428    HourToSecond,
429    /// Unit for `INTERVAL ... MINUTE`.
430    Minute,
431    /// Unit for `INTERVAL ... MINUTE TO SECOND`.
432    MinuteToSecond,
433    /// Unit for `INTERVAL ... MONTH`.
434    Month,
435    /// Unit for `INTERVAL ... SECOND`.
436    Second,
437    /// Unit for `INTERVAL ... YEAR`.
438    Year,
439    /// Unit for `INTERVAL ... YEAR TO MONTH`.
440    YearToMonth,
441}
442
443/// The available SQL column types.
444///
445/// The OpenAPI schema for this type is implemented by hand (see the `ToSchema`
446/// impl below) rather than derived, because `SqlType` has hand-written
447/// `Serialize`/`Deserialize` impls that emit Feldera's uppercase wire spellings
448/// (`"BIGINT"`, `"INTEGER"`, `"INTERVAL_DAY"`, …) instead of the Rust variant
449/// names. These wire strings are the platform's encoding of the types on the
450/// JSON API; they are not valid SQL type syntax (SQL writes `INTERVAL DAY`, with
451/// a space). A derived schema would advertise the variant names and thus
452/// disagree with the bytes actually on the wire, breaking generated clients.
453#[derive(Debug, Eq, PartialEq, Clone, Copy)]
454#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
455pub enum SqlType {
456    /// SQL `BOOLEAN` type.
457    Boolean,
458    /// SQL `TINYINT` type.
459    TinyInt,
460    /// SQL `SMALLINT` or `INT2` type.
461    SmallInt,
462    /// SQL `INTEGER`, `INT`, `SIGNED`, `INT4` type.
463    Int,
464    /// SQL `BIGINT` or `INT64` type.
465    BigInt,
466    /// SQL `TINYINT UNSIGNED` type.
467    UTinyInt,
468    /// SQL `SMALLINT UNSIGNED` type.
469    USmallInt,
470    /// SQL `UNSIGNED`, `INTEGER UNSIGNED`, `INT UNSIGNED` type.
471    UInt,
472    /// SQL `BIGINT UNSIGNED` type.
473    UBigInt,
474    /// SQL `REAL` or `FLOAT4` or `FLOAT32` type.
475    Real,
476    /// SQL `DOUBLE` or `FLOAT8` or `FLOAT64` type.
477    Double,
478    /// SQL `DECIMAL` or `DEC` or `NUMERIC` type.
479    Decimal,
480    /// SQL `CHAR(n)` or `CHARACTER(n)` type.
481    Char,
482    /// SQL `VARCHAR`, `CHARACTER VARYING`, `TEXT`, or `STRING` type.
483    Varchar,
484    /// SQL `BINARY(n)` type.
485    Binary,
486    /// SQL `VARBINARY` or `BYTEA` type.
487    Varbinary,
488    /// SQL `TIME` type.
489    Time,
490    /// SQL `DATE` type.
491    Date,
492    /// SQL `TIMESTAMP` type.
493    Timestamp,
494    /// SQL `TIMESTAMP WITH TIME ZONE` type.
495    TimestampTz,
496    /// SQL `INTERVAL ... X` type where `X` is a unit.
497    Interval(IntervalUnit),
498    /// SQL `ARRAY` type.
499    Array,
500    /// A complex SQL struct type (`CREATE TYPE x ...`).
501    Struct,
502    /// SQL `MAP` type.
503    Map,
504    /// SQL `NULL` type.
505    Null,
506    /// SQL `UUID` type.
507    Uuid,
508    /// SQL `VARIANT` type.
509    Variant,
510}
511
512/// Every wire string `SqlType` can serialize to, in variant order.
513///
514/// This is the single source of truth shared by `Serialize` (via
515/// `SqlType::as_wire_str`) and the OpenAPI schema, so the generated clients see
516/// exactly the values that appear on the wire. The
517/// `sql_type_schema_matches_serialization` test guards that every variant's
518/// serialized form is listed here.
519const SQL_TYPE_VALUES: &[&str] = &[
520    "BOOLEAN",
521    "TINYINT",
522    "SMALLINT",
523    "INTEGER",
524    "BIGINT",
525    "UTINYINT",
526    "USMALLINT",
527    "UINTEGER",
528    "UBIGINT",
529    "REAL",
530    "DOUBLE",
531    "DECIMAL",
532    "CHAR",
533    "VARCHAR",
534    "BINARY",
535    "VARBINARY",
536    "TIME",
537    "DATE",
538    "TIMESTAMP",
539    "TIMESTAMP_TZ",
540    "INTERVAL_DAY",
541    "INTERVAL_DAY_HOUR",
542    "INTERVAL_DAY_MINUTE",
543    "INTERVAL_DAY_SECOND",
544    "INTERVAL_HOUR",
545    "INTERVAL_HOUR_MINUTE",
546    "INTERVAL_HOUR_SECOND",
547    "INTERVAL_MINUTE",
548    "INTERVAL_MINUTE_SECOND",
549    "INTERVAL_MONTH",
550    "INTERVAL_SECOND",
551    "INTERVAL_YEAR",
552    "INTERVAL_YEAR_MONTH",
553    "ARRAY",
554    "STRUCT",
555    "MAP",
556    "NULL",
557    "UUID",
558    "VARIANT",
559];
560
561impl SqlType {
562    /// The uppercase wire spelling of this type — the canonical serialized form
563    /// the platform emits on the JSON API (`"BIGINT"`, `"INTEGER"`,
564    /// `"INTERVAL_DAY"`, …). Note these are wire strings, not valid SQL type
565    /// syntax.
566    pub fn as_wire_str(&self) -> &'static str {
567        match self {
568            SqlType::Boolean => "BOOLEAN",
569            SqlType::TinyInt => "TINYINT",
570            SqlType::SmallInt => "SMALLINT",
571            SqlType::Int => "INTEGER",
572            SqlType::BigInt => "BIGINT",
573            SqlType::UTinyInt => "UTINYINT",
574            SqlType::USmallInt => "USMALLINT",
575            SqlType::UInt => "UINTEGER",
576            SqlType::UBigInt => "UBIGINT",
577            SqlType::Real => "REAL",
578            SqlType::Double => "DOUBLE",
579            SqlType::Decimal => "DECIMAL",
580            SqlType::Char => "CHAR",
581            SqlType::Varchar => "VARCHAR",
582            SqlType::Binary => "BINARY",
583            SqlType::Varbinary => "VARBINARY",
584            SqlType::Time => "TIME",
585            SqlType::Date => "DATE",
586            SqlType::Timestamp => "TIMESTAMP",
587            SqlType::TimestampTz => "TIMESTAMP_TZ",
588            SqlType::Interval(interval_unit) => match interval_unit {
589                IntervalUnit::Day => "INTERVAL_DAY",
590                IntervalUnit::DayToHour => "INTERVAL_DAY_HOUR",
591                IntervalUnit::DayToMinute => "INTERVAL_DAY_MINUTE",
592                IntervalUnit::DayToSecond => "INTERVAL_DAY_SECOND",
593                IntervalUnit::Hour => "INTERVAL_HOUR",
594                IntervalUnit::HourToMinute => "INTERVAL_HOUR_MINUTE",
595                IntervalUnit::HourToSecond => "INTERVAL_HOUR_SECOND",
596                IntervalUnit::Minute => "INTERVAL_MINUTE",
597                IntervalUnit::MinuteToSecond => "INTERVAL_MINUTE_SECOND",
598                IntervalUnit::Month => "INTERVAL_MONTH",
599                IntervalUnit::Second => "INTERVAL_SECOND",
600                IntervalUnit::Year => "INTERVAL_YEAR",
601                IntervalUnit::YearToMonth => "INTERVAL_YEAR_MONTH",
602            },
603            SqlType::Array => "ARRAY",
604            SqlType::Struct => "STRUCT",
605            SqlType::Map => "MAP",
606            SqlType::Null => "NULL",
607            SqlType::Uuid => "UUID",
608            SqlType::Variant => "VARIANT",
609        }
610    }
611}
612
613impl ToSchema<'_> for SqlType {
614    fn schema() -> (&'static str, RefOr<Schema>) {
615        (
616            "SqlType",
617            RefOr::T(Schema::Object(
618                ObjectBuilder::new()
619                    .schema_type(SchemaType::String)
620                    .description(Some(
621                        "The available SQL column type names. Each value is the platform's wire \
622                         encoding of the type (e.g. `BIGINT`, `INTEGER`, `INTERVAL_DAY`), not \
623                         valid SQL type syntax.",
624                    ))
625                    .enum_values(Some(SQL_TYPE_VALUES.iter().copied()))
626                    .build(),
627            )),
628        )
629    }
630}
631
632impl Display for SqlType {
633    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
634        f.write_str(&serde_json::to_string(self).unwrap())
635    }
636}
637
638impl<'de> Deserialize<'de> for SqlType {
639    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
640    where
641        D: Deserializer<'de>,
642    {
643        let value: String = Deserialize::deserialize(deserializer)?;
644        match value.to_lowercase().as_str() {
645            "interval_day" => Ok(SqlType::Interval(IntervalUnit::Day)),
646            "interval_day_hour" => Ok(SqlType::Interval(IntervalUnit::DayToHour)),
647            "interval_day_minute" => Ok(SqlType::Interval(IntervalUnit::DayToMinute)),
648            "interval_day_second" => Ok(SqlType::Interval(IntervalUnit::DayToSecond)),
649            "interval_hour" => Ok(SqlType::Interval(IntervalUnit::Hour)),
650            "interval_hour_minute" => Ok(SqlType::Interval(IntervalUnit::HourToMinute)),
651            "interval_hour_second" => Ok(SqlType::Interval(IntervalUnit::HourToSecond)),
652            "interval_minute" => Ok(SqlType::Interval(IntervalUnit::Minute)),
653            "interval_minute_second" => Ok(SqlType::Interval(IntervalUnit::MinuteToSecond)),
654            "interval_month" => Ok(SqlType::Interval(IntervalUnit::Month)),
655            "interval_second" => Ok(SqlType::Interval(IntervalUnit::Second)),
656            "interval_year" => Ok(SqlType::Interval(IntervalUnit::Year)),
657            "interval_year_month" => Ok(SqlType::Interval(IntervalUnit::YearToMonth)),
658            "boolean" => Ok(SqlType::Boolean),
659            "tinyint" => Ok(SqlType::TinyInt),
660            "smallint" => Ok(SqlType::SmallInt),
661            "integer" => Ok(SqlType::Int),
662            "bigint" => Ok(SqlType::BigInt),
663            "utinyint" => Ok(SqlType::UTinyInt),
664            "usmallint" => Ok(SqlType::USmallInt),
665            "uinteger" => Ok(SqlType::UInt),
666            "ubigint" => Ok(SqlType::UBigInt),
667            "real" => Ok(SqlType::Real),
668            "double" => Ok(SqlType::Double),
669            "decimal" => Ok(SqlType::Decimal),
670            "char" => Ok(SqlType::Char),
671            "varchar" => Ok(SqlType::Varchar),
672            "binary" => Ok(SqlType::Binary),
673            "varbinary" => Ok(SqlType::Varbinary),
674            "variant" => Ok(SqlType::Variant),
675            "time" => Ok(SqlType::Time),
676            "date" => Ok(SqlType::Date),
677            "timestamp" => Ok(SqlType::Timestamp),
678            "timestamp_tz" => Ok(SqlType::TimestampTz),
679            "array" => Ok(SqlType::Array),
680            "struct" => Ok(SqlType::Struct),
681            "map" => Ok(SqlType::Map),
682            "null" => Ok(SqlType::Null),
683            "uuid" => Ok(SqlType::Uuid),
684            _ => Err(serde::de::Error::custom(format!(
685                "Unknown SQL type: {}",
686                value
687            ))),
688        }
689    }
690}
691
692impl Serialize for SqlType {
693    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
694    where
695        S: Serializer,
696    {
697        serializer.serialize_str(self.as_wire_str())
698    }
699}
700
701impl SqlType {
702    /// Is this a string type?
703    pub fn is_string(&self) -> bool {
704        matches!(self, Self::Char | Self::Varchar)
705    }
706
707    pub fn is_varchar(&self) -> bool {
708        matches!(self, Self::Varchar)
709    }
710
711    pub fn is_varbinary(&self) -> bool {
712        matches!(self, Self::Varbinary)
713    }
714}
715
716/// It so happens that when the type field is missing in the Calcite schema, it's a struct,
717/// so we use it as the default.
718const fn default_is_struct() -> SqlType {
719    SqlType::Struct
720}
721
722/// A SQL column type description.
723///
724/// Matches the Calcite JSON format.
725#[derive(Serialize, Deserialize, ToSchema, Debug, Eq, PartialEq, Clone)]
726#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
727pub struct ColumnType {
728    /// Identifier for the type (e.g., `VARCHAR`, `BIGINT`, `ARRAY` etc.)
729    #[serde(rename = "type")]
730    #[serde(default = "default_is_struct")]
731    pub typ: SqlType,
732    /// Does the type accept NULL values?
733    pub nullable: bool,
734    /// Precision of the type.
735    ///
736    /// # Examples
737    /// - `VARCHAR` sets precision to `-1`.
738    /// - `VARCHAR(255)` sets precision to `255`.
739    /// - `BIGINT`, `DATE`, `FLOAT`, `DOUBLE`, `GEOMETRY`, etc. sets precision
740    ///   to None
741    /// - `TIME`, `TIMESTAMP` set precision to `0`.
742    pub precision: Option<i64>,
743    /// The scale of the type.
744    ///
745    /// # Example
746    /// - `DECIMAL(1,2)` sets scale to `2`.
747    pub scale: Option<i64>,
748    /// A component of the type (if available).
749    ///
750    /// This is in a `Box` because it makes it a recursive types.
751    ///
752    /// For example, this would specify the `VARCHAR(20)` in the `VARCHAR(20)
753    /// ARRAY` type.
754    #[cfg_attr(feature = "testing", proptest(value = "None"))]
755    pub component: Option<Box<ColumnType>>,
756    /// The fields of the type (if available).
757    ///
758    /// For example this would specify the fields of a `CREATE TYPE` construct.
759    ///
760    /// ```sql
761    /// CREATE TYPE person_typ AS (
762    ///   firstname       VARCHAR(30),
763    ///   lastname        VARCHAR(30),
764    ///   address         ADDRESS_TYP
765    /// );
766    /// ```
767    ///
768    /// Would lead to the following `fields` value:
769    ///
770    /// ```sql
771    /// [
772    ///  ColumnType { name: "firstname, ... },
773    ///  ColumnType { name: "lastname", ... },
774    ///  ColumnType { name: "address", fields: [ ... ] }
775    /// ]
776    /// ```
777    #[cfg_attr(feature = "testing", proptest(value = "Some(Vec::new())"))]
778    pub fields: Option<Vec<Field>>,
779    /// Key type; must be set when `type == "MAP"`.
780    #[cfg_attr(feature = "testing", proptest(value = "None"))]
781    pub key: Option<Box<ColumnType>>,
782    /// Value type; must be set when `type == "MAP"`.
783    #[cfg_attr(feature = "testing", proptest(value = "None"))]
784    pub value: Option<Box<ColumnType>>,
785}
786
787impl ColumnType {
788    pub fn boolean(nullable: bool) -> Self {
789        ColumnType {
790            typ: SqlType::Boolean,
791            nullable,
792            precision: None,
793            scale: None,
794            component: None,
795            fields: None,
796            key: None,
797            value: None,
798        }
799    }
800
801    pub fn uuid(nullable: bool) -> Self {
802        ColumnType {
803            typ: SqlType::Uuid,
804            nullable,
805            precision: None,
806            scale: None,
807            component: None,
808            fields: None,
809            key: None,
810            value: None,
811        }
812    }
813
814    pub fn tinyint(nullable: bool) -> Self {
815        ColumnType {
816            typ: SqlType::TinyInt,
817            nullable,
818            precision: None,
819            scale: None,
820            component: None,
821            fields: None,
822            key: None,
823            value: None,
824        }
825    }
826
827    pub fn smallint(nullable: bool) -> Self {
828        ColumnType {
829            typ: SqlType::SmallInt,
830            nullable,
831            precision: None,
832            scale: None,
833            component: None,
834            fields: None,
835            key: None,
836            value: None,
837        }
838    }
839
840    pub fn int(nullable: bool) -> Self {
841        ColumnType {
842            typ: SqlType::Int,
843            nullable,
844            precision: None,
845            scale: None,
846            component: None,
847            fields: None,
848            key: None,
849            value: None,
850        }
851    }
852
853    pub fn bigint(nullable: bool) -> Self {
854        ColumnType {
855            typ: SqlType::BigInt,
856            nullable,
857            precision: None,
858            scale: None,
859            component: None,
860            fields: None,
861            key: None,
862            value: None,
863        }
864    }
865
866    pub fn utinyint(nullable: bool) -> Self {
867        ColumnType {
868            typ: SqlType::UTinyInt,
869            nullable,
870            precision: None,
871            scale: None,
872            component: None,
873            fields: None,
874            key: None,
875            value: None,
876        }
877    }
878
879    pub fn usmallint(nullable: bool) -> Self {
880        ColumnType {
881            typ: SqlType::USmallInt,
882            nullable,
883            precision: None,
884            scale: None,
885            component: None,
886            fields: None,
887            key: None,
888            value: None,
889        }
890    }
891
892    pub fn uint(nullable: bool) -> Self {
893        ColumnType {
894            typ: SqlType::UInt,
895            nullable,
896            precision: None,
897            scale: None,
898            component: None,
899            fields: None,
900            key: None,
901            value: None,
902        }
903    }
904
905    pub fn ubigint(nullable: bool) -> Self {
906        ColumnType {
907            typ: SqlType::UBigInt,
908            nullable,
909            precision: None,
910            scale: None,
911            component: None,
912            fields: None,
913            key: None,
914            value: None,
915        }
916    }
917
918    pub fn double(nullable: bool) -> Self {
919        ColumnType {
920            typ: SqlType::Double,
921            nullable,
922            precision: None,
923            scale: None,
924            component: None,
925            fields: None,
926            key: None,
927            value: None,
928        }
929    }
930
931    pub fn real(nullable: bool) -> Self {
932        ColumnType {
933            typ: SqlType::Real,
934            nullable,
935            precision: None,
936            scale: None,
937            component: None,
938            fields: None,
939            key: None,
940            value: None,
941        }
942    }
943
944    pub fn decimal(precision: i64, scale: i64, nullable: bool) -> Self {
945        ColumnType {
946            typ: SqlType::Decimal,
947            nullable,
948            precision: Some(precision),
949            scale: Some(scale),
950            component: None,
951            fields: None,
952            key: None,
953            value: None,
954        }
955    }
956
957    pub fn varchar(nullable: bool) -> Self {
958        ColumnType {
959            typ: SqlType::Varchar,
960            nullable,
961            precision: None,
962            scale: None,
963            component: None,
964            fields: None,
965            key: None,
966            value: None,
967        }
968    }
969
970    pub fn varbinary(nullable: bool) -> Self {
971        ColumnType {
972            typ: SqlType::Varbinary,
973            nullable,
974            precision: None,
975            scale: None,
976            component: None,
977            fields: None,
978            key: None,
979            value: None,
980        }
981    }
982
983    pub fn fixed(width: i64, nullable: bool) -> Self {
984        ColumnType {
985            typ: SqlType::Binary,
986            nullable,
987            precision: Some(width),
988            scale: None,
989            component: None,
990            fields: None,
991            key: None,
992            value: None,
993        }
994    }
995
996    pub fn date(nullable: bool) -> Self {
997        ColumnType {
998            typ: SqlType::Date,
999            nullable,
1000            precision: None,
1001            scale: None,
1002            component: None,
1003            fields: None,
1004            key: None,
1005            value: None,
1006        }
1007    }
1008
1009    pub fn time(nullable: bool) -> Self {
1010        ColumnType {
1011            typ: SqlType::Time,
1012            nullable,
1013            precision: None,
1014            scale: None,
1015            component: None,
1016            fields: None,
1017            key: None,
1018            value: None,
1019        }
1020    }
1021
1022    pub fn timestamp(nullable: bool) -> Self {
1023        ColumnType {
1024            typ: SqlType::Timestamp,
1025            nullable,
1026            precision: None,
1027            scale: None,
1028            component: None,
1029            fields: None,
1030            key: None,
1031            value: None,
1032        }
1033    }
1034
1035    pub fn timestamp_tz(nullable: bool) -> Self {
1036        ColumnType {
1037            typ: SqlType::TimestampTz,
1038            nullable,
1039            precision: None,
1040            scale: None,
1041            component: None,
1042            fields: None,
1043            key: None,
1044            value: None,
1045        }
1046    }
1047
1048    pub fn variant(nullable: bool) -> Self {
1049        ColumnType {
1050            typ: SqlType::Variant,
1051            nullable,
1052            precision: None,
1053            scale: None,
1054            component: None,
1055            fields: None,
1056            key: None,
1057            value: None,
1058        }
1059    }
1060
1061    pub fn array(nullable: bool, element: ColumnType) -> Self {
1062        ColumnType {
1063            typ: SqlType::Array,
1064            nullable,
1065            precision: None,
1066            scale: None,
1067            component: Some(Box::new(element)),
1068            fields: None,
1069            key: None,
1070            value: None,
1071        }
1072    }
1073
1074    pub fn structure(nullable: bool, fields: &[Field]) -> Self {
1075        ColumnType {
1076            typ: SqlType::Struct,
1077            nullable,
1078            precision: None,
1079            scale: None,
1080            component: None,
1081            fields: Some(fields.to_vec()),
1082            key: None,
1083            value: None,
1084        }
1085    }
1086
1087    pub fn map(nullable: bool, key: ColumnType, val: ColumnType) -> Self {
1088        ColumnType {
1089            typ: SqlType::Map,
1090            nullable,
1091            precision: None,
1092            scale: None,
1093            component: None,
1094            fields: None,
1095            key: Some(Box::new(key)),
1096            value: Some(Box::new(val)),
1097        }
1098    }
1099
1100    pub fn is_integral_type(&self) -> bool {
1101        matches!(
1102            &self.typ,
1103            SqlType::TinyInt
1104                | SqlType::SmallInt
1105                | SqlType::Int
1106                | SqlType::BigInt
1107                | SqlType::UTinyInt
1108                | SqlType::USmallInt
1109                | SqlType::UInt
1110                | SqlType::UBigInt
1111        )
1112    }
1113
1114    pub fn is_fp_type(&self) -> bool {
1115        matches!(&self.typ, SqlType::Double | SqlType::Real)
1116    }
1117
1118    pub fn is_decimal_type(&self) -> bool {
1119        matches!(&self.typ, SqlType::Decimal)
1120    }
1121
1122    pub fn is_numeric_type(&self) -> bool {
1123        self.is_integral_type() || self.is_fp_type() || self.is_decimal_type()
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::{IntervalUnit, SqlIdentifier};
1130    use crate::program_schema::{ColumnType, Field, SQL_TYPE_VALUES, SqlType};
1131
1132    /// Every `SqlType` variant, with intervals expanded to one entry per unit.
1133    fn all_sql_types() -> Vec<SqlType> {
1134        let mut types = vec![
1135            SqlType::Boolean,
1136            SqlType::TinyInt,
1137            SqlType::SmallInt,
1138            SqlType::Int,
1139            SqlType::BigInt,
1140            SqlType::UTinyInt,
1141            SqlType::USmallInt,
1142            SqlType::UInt,
1143            SqlType::UBigInt,
1144            SqlType::Real,
1145            SqlType::Double,
1146            SqlType::Decimal,
1147            SqlType::Char,
1148            SqlType::Varchar,
1149            SqlType::Binary,
1150            SqlType::Varbinary,
1151            SqlType::Time,
1152            SqlType::Date,
1153            SqlType::Timestamp,
1154            SqlType::TimestampTz,
1155        ];
1156        types.extend(
1157            [
1158                IntervalUnit::Day,
1159                IntervalUnit::DayToHour,
1160                IntervalUnit::DayToMinute,
1161                IntervalUnit::DayToSecond,
1162                IntervalUnit::Hour,
1163                IntervalUnit::HourToMinute,
1164                IntervalUnit::HourToSecond,
1165                IntervalUnit::Minute,
1166                IntervalUnit::MinuteToSecond,
1167                IntervalUnit::Month,
1168                IntervalUnit::Second,
1169                IntervalUnit::Year,
1170                IntervalUnit::YearToMonth,
1171            ]
1172            .map(SqlType::Interval),
1173        );
1174        types.extend([
1175            SqlType::Array,
1176            SqlType::Struct,
1177            SqlType::Map,
1178            SqlType::Null,
1179            SqlType::Uuid,
1180            SqlType::Variant,
1181        ]);
1182        types
1183    }
1184
1185    /// The OpenAPI schema is generated from `SQL_TYPE_VALUES`, while the wire
1186    /// format comes from `Serialize`. This guards that the two never drift:
1187    /// every variant serializes to a value the schema advertises, and the
1188    /// schema lists exactly those values (no stale or missing entries).
1189    #[test]
1190    fn sql_type_schema_matches_serialization() {
1191        let serialized: Vec<String> = all_sql_types()
1192            .iter()
1193            .map(|t| {
1194                serde_json::to_value(t)
1195                    .unwrap()
1196                    .as_str()
1197                    .unwrap()
1198                    .to_owned()
1199            })
1200            .collect();
1201
1202        // Every serialized value is advertised by the schema, and vice versa.
1203        assert_eq!(
1204            serialized,
1205            SQL_TYPE_VALUES
1206                .iter()
1207                .map(|s| s.to_string())
1208                .collect::<Vec<_>>(),
1209            "SQL_TYPE_VALUES (the OpenAPI enum) must match `Serialize` output exactly"
1210        );
1211
1212        // `as_wire_str` and `Serialize` must agree.
1213        for t in all_sql_types() {
1214            assert_eq!(
1215                serde_json::to_value(t).unwrap().as_str().unwrap(),
1216                t.as_wire_str()
1217            );
1218        }
1219    }
1220
1221    #[test]
1222    fn serde_sql_type() {
1223        for (sql_str_base, expected_value) in [
1224            ("Boolean", SqlType::Boolean),
1225            ("Uuid", SqlType::Uuid),
1226            ("TinyInt", SqlType::TinyInt),
1227            ("SmallInt", SqlType::SmallInt),
1228            ("Integer", SqlType::Int),
1229            ("BigInt", SqlType::BigInt),
1230            ("UTinyInt", SqlType::UTinyInt),
1231            ("USmallInt", SqlType::USmallInt),
1232            ("UInteger", SqlType::UInt),
1233            ("UBigInt", SqlType::UBigInt),
1234            ("Real", SqlType::Real),
1235            ("Double", SqlType::Double),
1236            ("Decimal", SqlType::Decimal),
1237            ("Char", SqlType::Char),
1238            ("Varchar", SqlType::Varchar),
1239            ("Binary", SqlType::Binary),
1240            ("Varbinary", SqlType::Varbinary),
1241            ("Time", SqlType::Time),
1242            ("Date", SqlType::Date),
1243            ("Timestamp", SqlType::Timestamp),
1244            ("Timestamp_Tz", SqlType::TimestampTz),
1245            ("Interval_Day", SqlType::Interval(IntervalUnit::Day)),
1246            (
1247                "Interval_Day_Hour",
1248                SqlType::Interval(IntervalUnit::DayToHour),
1249            ),
1250            (
1251                "Interval_Day_Minute",
1252                SqlType::Interval(IntervalUnit::DayToMinute),
1253            ),
1254            (
1255                "Interval_Day_Second",
1256                SqlType::Interval(IntervalUnit::DayToSecond),
1257            ),
1258            ("Interval_Hour", SqlType::Interval(IntervalUnit::Hour)),
1259            (
1260                "Interval_Hour_Minute",
1261                SqlType::Interval(IntervalUnit::HourToMinute),
1262            ),
1263            (
1264                "Interval_Hour_Second",
1265                SqlType::Interval(IntervalUnit::HourToSecond),
1266            ),
1267            ("Interval_Minute", SqlType::Interval(IntervalUnit::Minute)),
1268            (
1269                "Interval_Minute_Second",
1270                SqlType::Interval(IntervalUnit::MinuteToSecond),
1271            ),
1272            ("Interval_Month", SqlType::Interval(IntervalUnit::Month)),
1273            ("Interval_Second", SqlType::Interval(IntervalUnit::Second)),
1274            ("Interval_Year", SqlType::Interval(IntervalUnit::Year)),
1275            (
1276                "Interval_Year_Month",
1277                SqlType::Interval(IntervalUnit::YearToMonth),
1278            ),
1279            ("Array", SqlType::Array),
1280            ("Struct", SqlType::Struct),
1281            ("Map", SqlType::Map),
1282            ("Null", SqlType::Null),
1283            ("Variant", SqlType::Variant),
1284        ] {
1285            for sql_str in [
1286                sql_str_base,                 // Capitalized
1287                &sql_str_base.to_lowercase(), // lowercase
1288                &sql_str_base.to_uppercase(), // UPPERCASE
1289            ] {
1290                let value1: SqlType = serde_json::from_str(&format!("\"{}\"", sql_str))
1291                    .unwrap_or_else(|e| {
1292                        panic!(
1293                            "\"{sql_str}\" should deserialize into its SQL type: {}",
1294                            e.to_string()
1295                        )
1296                    });
1297                assert_eq!(value1, expected_value);
1298                let serialized_str =
1299                    serde_json::to_string(&value1).expect("Value should serialize into JSON");
1300                let value2: SqlType = serde_json::from_str(&serialized_str).unwrap_or_else(|_| {
1301                    panic!(
1302                        "{} should deserialize back into its SQL type",
1303                        serialized_str
1304                    )
1305                });
1306                assert_eq!(value1, value2);
1307            }
1308        }
1309    }
1310
1311    #[test]
1312    fn deserialize_interval_types() {
1313        use super::IntervalUnit::*;
1314        use super::SqlType::*;
1315
1316        let schema = r#"
1317{
1318  "inputs" : [ {
1319    "name" : "sales",
1320    "case_sensitive" : false,
1321    "fields" : [ {
1322      "name" : "sales_id",
1323      "case_sensitive" : false,
1324      "columntype" : {
1325        "type" : "INTEGER",
1326        "nullable" : true
1327      }
1328    }, {
1329      "name" : "customer_id",
1330      "case_sensitive" : false,
1331      "columntype" : {
1332        "type" : "INTEGER",
1333        "nullable" : true
1334      }
1335    }, {
1336      "name" : "age",
1337      "case_sensitive" : false,
1338      "columntype" : {
1339        "type" : "UINTEGER",
1340        "nullable" : true
1341      }
1342    }, {
1343      "name" : "amount",
1344      "case_sensitive" : false,
1345      "columntype" : {
1346        "type" : "DECIMAL",
1347        "nullable" : true,
1348        "precision" : 10,
1349        "scale" : 2
1350      }
1351    }, {
1352      "name" : "sale_date",
1353      "case_sensitive" : false,
1354      "columntype" : {
1355        "type" : "DATE",
1356        "nullable" : true
1357      }
1358    } ],
1359    "primary_key" : [ "sales_id" ]
1360  } ],
1361  "outputs" : [ {
1362    "name" : "salessummary",
1363    "case_sensitive" : false,
1364    "fields" : [ {
1365      "name" : "customer_id",
1366      "case_sensitive" : false,
1367      "columntype" : {
1368        "type" : "INTEGER",
1369        "nullable" : true
1370      }
1371    }, {
1372      "name" : "total_sales",
1373      "case_sensitive" : false,
1374      "columntype" : {
1375        "type" : "DECIMAL",
1376        "nullable" : true,
1377        "precision" : 38,
1378        "scale" : 2
1379      }
1380    }, {
1381      "name" : "interval_day",
1382      "case_sensitive" : false,
1383      "columntype" : {
1384        "type" : "INTERVAL_DAY",
1385        "nullable" : false,
1386        "precision" : 2,
1387        "scale" : 6
1388      }
1389    }, {
1390      "name" : "interval_day_to_hour",
1391      "case_sensitive" : false,
1392      "columntype" : {
1393        "type" : "INTERVAL_DAY_HOUR",
1394        "nullable" : false,
1395        "precision" : 2,
1396        "scale" : 6
1397      }
1398    }, {
1399      "name" : "interval_day_to_minute",
1400      "case_sensitive" : false,
1401      "columntype" : {
1402        "type" : "INTERVAL_DAY_MINUTE",
1403        "nullable" : false,
1404        "precision" : 2,
1405        "scale" : 6
1406      }
1407    }, {
1408      "name" : "interval_day_to_second",
1409      "case_sensitive" : false,
1410      "columntype" : {
1411        "type" : "INTERVAL_DAY_SECOND",
1412        "nullable" : false,
1413        "precision" : 2,
1414        "scale" : 6
1415      }
1416    }, {
1417      "name" : "interval_hour",
1418      "case_sensitive" : false,
1419      "columntype" : {
1420        "type" : "INTERVAL_HOUR",
1421        "nullable" : false,
1422        "precision" : 2,
1423        "scale" : 6
1424      }
1425    }, {
1426      "name" : "interval_hour_to_minute",
1427      "case_sensitive" : false,
1428      "columntype" : {
1429        "type" : "INTERVAL_HOUR_MINUTE",
1430        "nullable" : false,
1431        "precision" : 2,
1432        "scale" : 6
1433      }
1434    }, {
1435      "name" : "interval_hour_to_second",
1436      "case_sensitive" : false,
1437      "columntype" : {
1438        "type" : "INTERVAL_HOUR_SECOND",
1439        "nullable" : false,
1440        "precision" : 2,
1441        "scale" : 6
1442      }
1443    }, {
1444      "name" : "interval_minute",
1445      "case_sensitive" : false,
1446      "columntype" : {
1447        "type" : "INTERVAL_MINUTE",
1448        "nullable" : false,
1449        "precision" : 2,
1450        "scale" : 6
1451      }
1452    }, {
1453      "name" : "interval_minute_to_second",
1454      "case_sensitive" : false,
1455      "columntype" : {
1456        "type" : "INTERVAL_MINUTE_SECOND",
1457        "nullable" : false,
1458        "precision" : 2,
1459        "scale" : 6
1460      }
1461    }, {
1462      "name" : "interval_month",
1463      "case_sensitive" : false,
1464      "columntype" : {
1465        "type" : "INTERVAL_MONTH",
1466        "nullable" : false
1467      }
1468    }, {
1469      "name" : "interval_second",
1470      "case_sensitive" : false,
1471      "columntype" : {
1472        "type" : "INTERVAL_SECOND",
1473        "nullable" : false,
1474        "precision" : 2,
1475        "scale" : 6
1476      }
1477    }, {
1478      "name" : "interval_year",
1479      "case_sensitive" : false,
1480      "columntype" : {
1481        "type" : "INTERVAL_YEAR",
1482        "nullable" : false
1483      }
1484    }, {
1485      "name" : "interval_year_to_month",
1486      "case_sensitive" : false,
1487      "columntype" : {
1488        "type" : "INTERVAL_YEAR_MONTH",
1489        "nullable" : false
1490      }
1491    } ]
1492  } ]
1493}
1494"#;
1495
1496        let schema: super::ProgramSchema = serde_json::from_str(schema).unwrap();
1497        let types = schema
1498            .outputs
1499            .iter()
1500            .flat_map(|r| r.fields.iter().map(|f| f.columntype.typ));
1501        let expected_types = [
1502            Int,
1503            Decimal,
1504            Interval(Day),
1505            Interval(DayToHour),
1506            Interval(DayToMinute),
1507            Interval(DayToSecond),
1508            Interval(Hour),
1509            Interval(HourToMinute),
1510            Interval(HourToSecond),
1511            Interval(Minute),
1512            Interval(MinuteToSecond),
1513            Interval(Month),
1514            Interval(Second),
1515            Interval(Year),
1516            Interval(YearToMonth),
1517        ];
1518
1519        assert_eq!(types.collect::<Vec<_>>(), &expected_types);
1520    }
1521
1522    #[test]
1523    fn serialize_struct_schemas() {
1524        let schema = r#"{
1525  "inputs" : [ {
1526    "name" : "PERS",
1527    "case_sensitive" : false,
1528    "fields" : [ {
1529      "name" : "P0",
1530      "case_sensitive" : false,
1531      "columntype" : {
1532        "fields" : [ {
1533          "type" : "VARCHAR",
1534          "nullable" : true,
1535          "precision" : 30,
1536          "name" : "FIRSTNAME"
1537        }, {
1538          "type" : "VARCHAR",
1539          "nullable" : true,
1540          "precision" : 30,
1541          "name" : "LASTNAME"
1542        }, {
1543          "type" : "UINTEGER",
1544          "nullable" : true,
1545          "name" : "AGE"
1546        }, {
1547          "fields" : {
1548            "fields" : [ {
1549              "type" : "VARCHAR",
1550              "nullable" : true,
1551              "precision" : 30,
1552              "name" : "STREET"
1553            }, {
1554              "type" : "VARCHAR",
1555              "nullable" : true,
1556              "precision" : 30,
1557              "name" : "CITY"
1558            }, {
1559              "type" : "CHAR",
1560              "nullable" : true,
1561              "precision" : 2,
1562              "name" : "STATE"
1563            }, {
1564              "type" : "VARCHAR",
1565              "nullable" : true,
1566              "precision" : 6,
1567              "name" : "POSTAL_CODE"
1568            } ],
1569            "nullable" : false
1570          },
1571          "nullable" : false,
1572          "name" : "ADDRESS"
1573        } ],
1574        "nullable" : false
1575      }
1576    }]
1577  } ],
1578  "outputs" : [ ]
1579}
1580"#;
1581        let schema: super::ProgramSchema = serde_json::from_str(schema).unwrap();
1582        eprintln!("{:#?}", schema);
1583        let pers = schema.inputs.iter().find(|r| r.name == "PERS").unwrap();
1584        let p0 = pers.fields.iter().find(|f| f.name == "P0").unwrap();
1585        assert_eq!(p0.columntype.typ, SqlType::Struct);
1586        let p0_fields = p0.columntype.fields.as_ref().unwrap();
1587        assert_eq!(p0_fields[0].columntype.typ, SqlType::Varchar);
1588        assert_eq!(p0_fields[1].columntype.typ, SqlType::Varchar);
1589        assert_eq!(p0_fields[2].columntype.typ, SqlType::UInt);
1590        assert_eq!(p0_fields[3].columntype.typ, SqlType::Struct);
1591        assert_eq!(p0_fields[3].name, "ADDRESS");
1592        let address = &p0_fields[3].columntype.fields.as_ref().unwrap();
1593        assert_eq!(address.len(), 4);
1594        assert_eq!(address[0].name, "STREET");
1595        assert_eq!(address[0].columntype.typ, SqlType::Varchar);
1596        assert_eq!(address[1].columntype.typ, SqlType::Varchar);
1597        assert_eq!(address[2].columntype.typ, SqlType::Char);
1598        assert_eq!(address[3].columntype.typ, SqlType::Varchar);
1599    }
1600
1601    #[test]
1602    fn sql_identifier_cmp() {
1603        assert_eq!(SqlIdentifier::from("foo"), SqlIdentifier::from("foo"));
1604        assert_ne!(SqlIdentifier::from("foo"), SqlIdentifier::from("bar"));
1605        assert_eq!(SqlIdentifier::from("bar"), SqlIdentifier::from("BAR"));
1606        assert_eq!(SqlIdentifier::from("foo"), SqlIdentifier::from("\"foo\""));
1607        assert_eq!(SqlIdentifier::from("bar"), SqlIdentifier::from("\"bar\""));
1608        assert_eq!(SqlIdentifier::from("bAr"), SqlIdentifier::from("\"bAr\""));
1609        assert_eq!(
1610            SqlIdentifier::new("bAr", true),
1611            SqlIdentifier::from("\"bAr\"")
1612        );
1613
1614        assert_eq!(SqlIdentifier::from("bAr"), "bar");
1615        assert_eq!(SqlIdentifier::from("bAr"), "bAr");
1616    }
1617
1618    #[test]
1619    fn sql_identifier_ord() {
1620        let mut btree = std::collections::BTreeSet::new();
1621        assert!(btree.insert(SqlIdentifier::from("foo")));
1622        assert!(btree.insert(SqlIdentifier::from("bar")));
1623        assert!(!btree.insert(SqlIdentifier::from("BAR")));
1624        assert!(!btree.insert(SqlIdentifier::from("\"foo\"")));
1625        assert!(!btree.insert(SqlIdentifier::from("\"bar\"")));
1626    }
1627
1628    #[test]
1629    fn sql_identifier_hash() {
1630        let mut hs = std::collections::HashSet::new();
1631        assert!(hs.insert(SqlIdentifier::from("foo")));
1632        assert!(hs.insert(SqlIdentifier::from("bar")));
1633        assert!(!hs.insert(SqlIdentifier::from("BAR")));
1634        assert!(!hs.insert(SqlIdentifier::from("\"foo\"")));
1635        assert!(!hs.insert(SqlIdentifier::from("\"bar\"")));
1636    }
1637
1638    #[test]
1639    fn sql_identifier_name() {
1640        assert_eq!(SqlIdentifier::from("foo").name(), "foo");
1641        assert_eq!(SqlIdentifier::from("bAr").name(), "bar");
1642        assert_eq!(SqlIdentifier::from("\"bAr\"").name(), "bAr");
1643        assert_eq!(SqlIdentifier::from("foo").sql_name(), "foo");
1644        assert_eq!(SqlIdentifier::from("bAr").sql_name(), "bAr");
1645        assert_eq!(SqlIdentifier::from("\"bAr\"").sql_name(), "\"bAr\"");
1646    }
1647
1648    #[test]
1649    fn issue3277() {
1650        let schema = r#"{
1651      "name" : "j",
1652      "case_sensitive" : false,
1653      "columntype" : {
1654        "fields" : [ {
1655          "key" : {
1656            "nullable" : false,
1657            "precision" : -1,
1658            "type" : "VARCHAR"
1659          },
1660          "name" : "s",
1661          "nullable" : true,
1662          "type" : "MAP",
1663          "value" : {
1664            "nullable" : true,
1665            "precision" : -1,
1666            "type" : "VARCHAR"
1667          }
1668        } ],
1669        "nullable" : true
1670      }
1671    }"#;
1672        let field: Field = serde_json::from_str(schema).unwrap();
1673        println!("field: {:#?}", field);
1674        assert_eq!(
1675            field,
1676            Field {
1677                name: SqlIdentifier {
1678                    name: "j".to_string(),
1679                    case_sensitive: false,
1680                },
1681                columntype: ColumnType {
1682                    typ: SqlType::Struct,
1683                    nullable: true,
1684                    precision: None,
1685                    scale: None,
1686                    component: None,
1687                    fields: Some(vec![Field {
1688                        name: SqlIdentifier {
1689                            name: "s".to_string(),
1690                            case_sensitive: false,
1691                        },
1692                        columntype: ColumnType {
1693                            typ: SqlType::Map,
1694                            nullable: true,
1695                            precision: None,
1696                            scale: None,
1697                            component: None,
1698                            fields: None,
1699                            key: Some(Box::new(ColumnType {
1700                                typ: SqlType::Varchar,
1701                                nullable: false,
1702                                precision: Some(-1),
1703                                scale: None,
1704                                component: None,
1705                                fields: None,
1706                                key: None,
1707                                value: None,
1708                            })),
1709                            value: Some(Box::new(ColumnType {
1710                                typ: SqlType::Varchar,
1711                                nullable: true,
1712                                precision: Some(-1),
1713                                scale: None,
1714                                component: None,
1715                                fields: None,
1716                                key: None,
1717                                value: None,
1718                            })),
1719                        },
1720                        lateness: None,
1721                        default: None,
1722                        unused: false,
1723                        watermark: None,
1724                    }]),
1725                    key: None,
1726                    value: None,
1727                },
1728                lateness: None,
1729                default: None,
1730                unused: false,
1731                watermark: None,
1732            }
1733        );
1734    }
1735}