1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/// Generates the query string for an upsert in an archive table
#[macro_export]
macro_rules! upsert_archive_query {
  ($table:ident (
    time ( $($)?$tid:literal $period:ident, $retrieved_at:ident ),
    primary ( $( $($)?$pid:literal $pname:ident :: $pty:ident ),* $(,)? ),
    data ( $( $($)?$did:literal $dname:ident :: $dty:ident $(?)? ),* $(,)? )
    $(, unique($(
      $ukey:ident( $( $uname:ident ),* $(,)? )
    ),* $(,)?))?
    $(,)?
  )) => {
    upsert_archive_query!(
      @inner $table (
        time ( $tid $period, $retrieved_at ),
        primary ( $( $pid $pname :: $pty ),* ),
        primary_keys ( concat!(stringify!($period) $(, ", ", stringify!($pname))*) ),
        data ( $( $did $dname :: $dty ),* )
        $(, unique($(
          $ukey( stringify!( $( $uname ),* ) )
        ),*))?
      )
    )
  };
  (@inner $table:ident (
    time ( $($)?$tid:literal $period:ident, $retrieved_at:ident ),
    primary ( $( $($)?$pid:literal $pname:ident :: $pty:ident ),* $(,)? ),
    primary_keys( $primary_keys:expr ),
    data ( $( $($)?$did:literal $dname:ident :: $dty:ident ),* $(,)? )
    $(, unique($(
      $ukey:ident( $ulist:expr )
    ),* $(,)?))?
    $(,)?
  )) => {
    concat!(
      "WITH input_row(",
        $primary_keys, ", ", stringify!($retrieved_at), $(", ", stringify!($dname)),*,
      ") AS (VALUES(",
        "PERIOD($", stringify!($tid), "::INSTANT, NULL)", $(", $", stringify!($pid), "::", stringify!($pty)),*,
        ", ARRAY[$", stringify!($tid), "::INSTANT]",
        $(", $", stringify!($did), "::", stringify!($dty)),*,
      ")), ",
      "current_row_primary AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
        "WHERE upper_inf(period)" $(, " AND $", stringify!($pid), "::", stringify!($pty), " = ", stringify!($pname))*, " ",
        "LIMIT 1",
      "), ",
      $($(
      "current_row_", stringify!($ukey), " AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
        "WHERE upper_inf(period) AND ROW(", $ulist, ") = (SELECT ", $ulist, " FROM input_row) ",
        "LIMIT 1",
      "), ",
      )*)?
      "matching_current_row AS (",
        "SELECT ", $primary_keys, " ",
        "FROM ", stringify!($table), " ",
          "INNER JOIN current_row_primary USING(", $primary_keys, ") ",
          $($(
          "INNER JOIN current_row_", stringify!($ukey), " USING(", $primary_keys, ") ",
          )*)?
        "WHERE TRUE" $(, " AND $", stringify!($did), "::", stringify!($dty), " IS NOT DISTINCT FROM ", stringify!($table), ".", stringify!($dname))*,
      "), ",
      "missing_input_row AS (",
        "SELECT * ",
        "FROM input_row ",
        "WHERE NOT EXISTS (SELECT 1 FROM matching_current_row)",
      "), ",
      "current_rows AS (",
        "SELECT * FROM current_row_primary",
        $($(
        " UNION SELECT * FROM current_row_", stringify!($ukey),
        )*)?
      "), ",
      "rows_to_invalidate AS (",
        "SELECT * FROM current_rows ",
        "EXCEPT ",
        "SELECT * FROM matching_current_row",
      "), ",
      "invalidated_rows AS (",
        "UPDATE ", stringify!($table), " ",
        "SET ", stringify!($period), " = PERIOD(lower(", stringify!($period), "), $", stringify!($tid), "::INSTANT) ",
        "WHERE ROW(", $primary_keys, ") IN (SELECT ", $primary_keys, " FROM rows_to_invalidate) ",
        "RETURNING ", $primary_keys,
      "), ",
      "updated_row AS (",
        "UPDATE ", stringify!($table), " ",
        "SET ", stringify!($retrieved_at), " = ordered_set_insert(", stringify!($retrieved_at), "::ANYARRAY, $", stringify!($tid), "::INSTANT::ANYELEMENT) ",
        "WHERE ROW(", $primary_keys, ") = (SELECT ", $primary_keys, " FROM matching_current_row) ",
        "RETURNING ", $primary_keys,
      "), ",
      "inserted_row AS (",
        "INSERT ",
        "INTO ", stringify!($table), "(", $primary_keys, ", ", stringify!($retrieved_at) $(, ", ", stringify!($dname))*, ") ",
        "SELECT * FROM missing_input_row ",
        "RETURNING ", $primary_keys,
      ") ",
      "SELECT * FROM invalidated_rows ",
      "UNION ALL ",
      "SELECT * FROM updated_row ",
      "UNION ALL ",
      "SELECT * FROM inserted_row;",
    )
  };
}