1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
/// Generates the query string for an upsert in an archive table
#[macro_export]
macro_rules! upsert_archive_query {
  ($table:ident (
    time ( $($)?$tid:literal $period:ident, $retrieved_at:ident ),
    primary ( $( $($)?$pid:literal $pname:ident :: $pty:ident ),* $(,)? ),
    data ( $( $($)?$did:literal $dname:ident :: $dty:ident $(?)? ),* $(,)? )
    $(, unique($(
      $ukey:ident( $( $uname:ident ),* $(,)? )
    ),* $(,)?))?
    $(,)?
  )) => {
    upsert_archive_query!(
      @inner $table (
        time ( $tid $period, $retrieved_at ),
        primary ( $( $pid $pname :: $pty ),* ),
        primary_keys ( concat!(stringify!($period) $(, ", ", stringify!($pname))*) ),
        data ( $( $did $dname :: $dty ),* )
        $(, unique($(
          $ukey( stringify!( $( $uname ),* ) )
        ),*))?
      )
    )
  };
  (@inner $table:ident (
    time ( $($)?$tid:literal $period:ident, $retrieved_at:ident ),
    primary ( $( $($)?$pid:literal $pname:ident :: $pty:ident ),* $(,)? ),
    primary_keys( $primary_keys:expr ),
    data ( $( $($)?$did:literal $dname:ident :: $dty:ident ),* $(,)? )
    $(, unique($(
      $ukey:ident( $ulist:expr )
    ),* $(,)?))?
    $(,)?
  )) => {
    concat!(
      "WITH input_row(",
        $primary_keys, ", ", stringify!($retrieved_at), $(", ", stringify!($dname)),*,
      ") AS (VALUES(",
        "PERIOD($", stringify!($tid), "::INSTANT, NULL)", $(", $", stringify!($pid), "::", stringify!($pty)),*,
        ", ARRAY[$", stringify!($tid), "::INSTANT]",
        $(", $", stringify!($did), "::", stringify!($dty)),*,
      ")), ",
      "current_row_primary AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
        "WHERE upper_inf(period)" $(, " AND $", stringify!($pid), "::", stringify!($pty), " = ", stringify!($pname))*, " ",
        "LIMIT 1",
      "), ",
      $($(
      "current_row_", stringify!($ukey), " AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
        "WHERE upper_inf(period) AND ROW(", $ulist, ") = (SELECT ", $ulist, " FROM input_row) ",
        "LIMIT 1",
      "), ",
      )*)?
      "matching_current_row AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
          "INNER JOIN current_row_primary USING(", $primary_keys, ") ",
          $($(
          "INNER JOIN current_row_", stringify!($ukey), " USING(", $primary_keys, ") ",
          )*)?
        "WHERE TRUE" $(, " AND $", stringify!($did), "::", stringify!($dty), " IS NOT DISTINCT FROM ", stringify!($table), ".", stringify!($dname))*,
      "), ",
      "missing_input_row AS (",
        "SELECT * ",
        "FROM input_row ",
        "WHERE NOT EXISTS (SELECT 1 FROM matching_current_row)",
      "), ",
      "current_rows AS (",
        "SELECT * FROM current_row_primary",
        $($(
        " UNION SELECT * FROM current_row_", stringify!($ukey),
        )*)?
      "), ",
      "rows_to_invalidate AS (",
        "SELECT * FROM current_rows ",
        "EXCEPT ",
        "SELECT * FROM matching_current_row",
      "), ",
      "invalidated_rows AS (",
        "UPDATE ", stringify!($table), " ",
        "SET ", stringify!($period), " = PERIOD(lower(", stringify!($period), "), $", stringify!($tid), "::INSTANT) ",
        "WHERE ROW(", $primary_keys, ") IN (SELECT ", $primary_keys, " FROM rows_to_invalidate) ",
        "RETURNING ", $primary_keys,
      "), ",
      "updated_row AS (",
        "UPDATE ", stringify!($table), " ",
        "SET ", stringify!($retrieved_at), " = ordered_set_insert(", stringify!($retrieved_at), "::ANYARRAY, $", stringify!($tid), "::INSTANT::ANYELEMENT) ",
        "WHERE ROW(", $primary_keys, ") = (SELECT ", $primary_keys, " FROM matching_current_row) ",
        "RETURNING ", $primary_keys,
      "), ",
      "inserted_row AS (",
        "INSERT ",
        "INTO ", stringify!($table), "(", $primary_keys, ", ", stringify!($retrieved_at) $(, ", ", stringify!($dname))*, ") ",
        "SELECT * FROM missing_input_row ",
        "RETURNING ", $primary_keys,
      ") ",
      "SELECT * FROM invalidated_rows ",
      "UNION ALL ",
      "SELECT * FROM updated_row ",
      "UNION ALL ",
      "SELECT * FROM inserted_row;",
    )
  };
}