pg-client 0.3.0

PostgreSQL client configuration and connection management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
//! Index creation for partitioned tables.

use core::num::NonZeroU16;
use std::collections::BTreeSet;

use sqlx::Row as _;
use sqlx::SqlSafeStr as _;

use super::{ConcurrentlyConfig, Error, FillFactor, SqlFragment};
use crate::identifier::{AccessMethod, Index, QualifiedTable, Table};

/// Input parameters for adding an index to a partitioned table.
pub struct Input {
    /// The schema-qualified parent partitioned table.
    pub qualified_table: QualifiedTable,
    /// The desired parent index name.
    pub index: Index,
    /// The index key expression without surrounding parentheses (e.g. `"lower(email), account_id"`).
    pub key_expression: SqlFragment,
    /// Whether the index should enforce uniqueness.
    pub unique: bool,
    /// The index access method (e.g. btree, hash).
    pub method: AccessMethod,
    /// An optional `INCLUDE` clause for covering indexes (without `INCLUDE` keyword or parentheses).
    pub include: Option<SqlFragment>,
    /// An optional `WHERE` clause (without the `WHERE` keyword).
    pub where_clause: Option<SqlFragment>,
    /// An optional index fillfactor (1-100).
    pub fillfactor: Option<FillFactor>,
    /// Concurrency settings for partition index creation.
    pub concurrently: ConcurrentlyConfig,
}

/// Result of a successful index addition.
#[derive(Debug)]
pub struct Result {
    /// Time elapsed during the operation.
    pub elapsed: std::time::Duration,
    /// Partitions that were processed.
    pub partitions: Vec<Partition>,
}

impl std::fmt::Display for Result {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Created {} partition indexes in {:.2}s",
            self.partitions.len(),
            self.elapsed.as_secs_f64()
        )
    }
}

/// A partition with its derived index name and SQL statements for creation.
#[derive(Debug, Clone, serde::Deserialize)]
pub struct Partition {
    /// Schema-qualified partition table name.
    #[serde(flatten)]
    pub qualified_table: QualifiedTable,
    /// Partition index name.
    pub index: Index,
    /// Partition `CREATE INDEX` statement.
    #[serde(deserialize_with = "super::sql_str_serde::deserialize")]
    pub create_index_statement: sqlx::SqlStr,
    /// `ALTER INDEX ATTACH PARTITION` statement.
    #[serde(deserialize_with = "super::sql_str_serde::deserialize")]
    pub attach_statement: sqlx::SqlStr,
}

#[derive(Debug, Clone, serde::Deserialize)]
struct PartitionRow {
    /// Schema-qualified partition table name.
    #[serde(flatten)]
    qualified_table: QualifiedTable,
    /// Partition index name.
    index: Index,
    /// Partition `CREATE INDEX` statement (non-concurrent).
    #[serde(deserialize_with = "super::sql_str_serde::deserialize")]
    create_index_statement: sqlx::SqlStr,
    /// Partition `CREATE INDEX CONCURRENTLY` statement.
    #[serde(deserialize_with = "super::sql_str_serde::deserialize")]
    create_index_statement_concurrently: sqlx::SqlStr,
    /// `ALTER INDEX ATTACH PARTITION` statement.
    #[serde(deserialize_with = "super::sql_str_serde::deserialize")]
    attach_statement: sqlx::SqlStr,
}

/// All SQL statements needed for the index addition protocol.
pub struct Statements {
    /// The `CREATE INDEX ON ONLY` statement for the parent table.
    pub parent_create: sqlx::SqlStr,
    /// Per-partition information including SQL statements.
    pub partitions: Vec<Partition>,
}

/// Fetch all SQL statements for the index addition protocol.
///
/// Returns the parent `CREATE INDEX ON ONLY` statement, per-partition
/// `CREATE INDEX` statements, and `ALTER INDEX ATTACH PARTITION` statements.
/// All statements are generated server-side via `format()` with `%I` identifier quoting.
pub async fn fetch_statements(
    config: &crate::Config,
    input: &Input,
) -> core::result::Result<Statements, Error> {
    let method = input.method.as_str();
    let unique_keyword = if input.unique { "UNIQUE " } else { "" };
    let key_expression = input.key_expression.as_str();
    let include_clause = input
        .include
        .as_ref()
        .map(|i| i.as_str())
        .unwrap_or_default();
    let where_clause = input
        .where_clause
        .as_ref()
        .map(|w| w.as_str())
        .unwrap_or_default();
    let fillfactor = input
        .fillfactor
        .map(|value| value.as_u8().to_string())
        .unwrap_or_default();

    let (parent_create, partitions) = config
        .with_sqlx_connection(async |connection| {
            let row = sqlx::query(indoc::indoc! {"
                WITH
                  params
                  ( parent_table
                  , schema_name
                  , parent_index
                  , access_method
                  , unique_keyword
                  , key_expression
                  , include_clause
                  , fillfactor
                  , where_clause
                  ) AS (
                    VALUES
                      ( $1::text
                      , $2::text
                      , $3::text
                      , $4::text
                      , $5::text
                      , $6::text
                      , $7::text
                      , $8::text
                      , $9::text
                      )
                  )
                , fragments AS (
                    SELECT
                      derived.include_clause
                    , derived.storage_clause
                    , derived.where_clause
                    , format
                      ( 'CREATE %sINDEX %I ON ONLY %I.%I USING %I (%s)%s%s%s'
                      , params.unique_keyword
                      , params.parent_index
                      , params.schema_name
                      , params.parent_table
                      , params.access_method
                      , params.key_expression
                      , derived.include_clause
                      , derived.storage_clause
                      , derived.where_clause
                      ) AS parent_create_statement
                    , format
                      ( 'CREATE %sINDEX %%I ON %%I.%%I USING %I (%s)%s%s%s'
                      , params.unique_keyword
                      , params.access_method
                      , params.key_expression
                      , derived.include_clause
                      , derived.storage_clause
                      , derived.where_clause
                      ) AS create_index_template
                    , format
                      ( 'CREATE %sINDEX CONCURRENTLY %%I ON %%I.%%I USING %I (%s)%s%s%s'
                      , params.unique_keyword
                      , params.access_method
                      , params.key_expression
                      , derived.include_clause
                      , derived.storage_clause
                      , derived.where_clause
                      ) AS create_index_template_concurrently
                    , format
                      ( 'ALTER INDEX %I.%I ATTACH PARTITION %%I.%%I'
                      , params.schema_name
                      , params.parent_index
                      ) AS attach_index_template
                    FROM
                      params
                    CROSS JOIN LATERAL (
                      SELECT
                        CASE WHEN params.include_clause = '' THEN '' ELSE ' INCLUDE (' || params.include_clause || ')' END
                      , CASE WHEN params.fillfactor = '' THEN '' ELSE ' WITH (fillfactor = ' || params.fillfactor || ')' END
                      , CASE WHEN params.where_clause = '' THEN '' ELSE ' WHERE ' || params.where_clause END
                    ) AS derived(include_clause, storage_clause, where_clause)
                  )
                , partitions AS (
                    SELECT
                      child_namespace.nspname AS schema
                    , child_class.relname AS table
                    , derived.partition_index_name AS index
                    , format
                      ( fragments.create_index_template
                      , derived.partition_index_name
                      , child_namespace.nspname
                      , child_class.relname
                      ) AS create_index_statement
                    , format
                      ( fragments.create_index_template_concurrently
                      , derived.partition_index_name
                      , child_namespace.nspname
                      , child_class.relname
                      ) AS create_index_statement_concurrently
                    , format
                      ( fragments.attach_index_template
                      , child_namespace.nspname
                      , derived.partition_index_name
                      ) AS attach_statement
                    FROM
                      params
                    CROSS JOIN
                      fragments
                    CROSS JOIN
                      pg_inherits
                    JOIN
                      pg_class AS parent_class
                    ON
                      parent_class.oid = pg_inherits.inhparent
                    JOIN
                      pg_class AS child_class
                    ON
                      child_class.oid = pg_inherits.inhrelid
                    CROSS JOIN LATERAL (
                      SELECT
                        CASE
                          WHEN octet_length(base_name) <= 63 THEN base_name
                          ELSE left(base_name, 54) || '_' || substr(md5(base_name), 1, 8)
                        END AS partition_index_name
                      FROM (
                        SELECT params.parent_index || '_' || child_class.relname AS base_name
                      ) AS base
                    ) AS derived(partition_index_name)
                    JOIN
                      pg_namespace AS parent_namespace
                    ON
                      parent_namespace.oid = parent_class.relnamespace
                    JOIN
                      pg_namespace AS child_namespace
                    ON
                      child_namespace.oid = child_class.relnamespace
                    WHERE
                      parent_class.relkind = 'p'
                    AND
                      parent_class.relname = params.parent_table
                    AND
                      parent_namespace.nspname = params.schema_name
                  )
                SELECT
                  fragments.parent_create_statement
                , (
                    SELECT
                      COALESCE(jsonb_agg(
                        jsonb_build_object
                          ( 'schema', partitions.schema
                          , 'table', partitions.table
                          , 'index', partitions.index
                          , 'create_index_statement', partitions.create_index_statement
                          , 'create_index_statement_concurrently', partitions.create_index_statement_concurrently
                          , 'attach_statement', partitions.attach_statement
                          )
                        ORDER BY partitions.schema, partitions.table
                      ), '[]'::jsonb)
                    FROM
                      partitions
                  ) AS partitions
                FROM
                  fragments
            "})
            .bind(input.qualified_table.table.as_str())
            .bind(input.qualified_table.schema.as_str())
            .bind(input.index.as_str())
            .bind(method)
            .bind(unique_keyword)
            .bind(key_expression)
            .bind(include_clause)
            .bind(fillfactor.clone())
            .bind(where_clause)
            .fetch_one(connection)
            .await?;

            let parent_create: String = row.get("parent_create_statement");
            let partitions_json: serde_json::Value = row.get("partitions");
            let partitions: Vec<PartitionRow> =
                serde_json::from_value(partitions_json).expect("valid partition JSON from database");

            Ok::<_, sqlx::Error>((parent_create, partitions))
        })
        .await??;

    if partitions.is_empty() {
        return Err(Error::NoPartitions {
            qualified_table: input.qualified_table.clone(),
        });
    }

    let partition_tables: BTreeSet<Table> = partitions
        .iter()
        .map(|partition| partition.qualified_table.table.clone())
        .collect();
    validate_concurrently_tables(&input.concurrently, &partition_tables)?;

    let partitions = partitions
        .into_iter()
        .map(|partition| {
            let create_index_statement = if input
                .concurrently
                .is_concurrent_for(&partition.qualified_table.table)
            {
                partition.create_index_statement_concurrently
            } else {
                partition.create_index_statement
            };

            Partition {
                qualified_table: partition.qualified_table,
                index: partition.index,
                create_index_statement,
                attach_statement: partition.attach_statement,
            }
        })
        .collect();

    Ok(Statements {
        parent_create: sqlx::AssertSqlSafe(parent_create).into_sql_str(),
        partitions,
    })
}

fn validate_concurrently_tables(
    concurrently: &ConcurrentlyConfig,
    partition_tables: &BTreeSet<Table>,
) -> core::result::Result<(), Error> {
    let requested_tables = match concurrently {
        ConcurrentlyConfig::Except(tables) => Some(tables),
        ConcurrentlyConfig::None | ConcurrentlyConfig::All => None,
    };

    let Some(requested_tables) = requested_tables else {
        return Ok(());
    };

    let unknown_tables: BTreeSet<Table> = requested_tables
        .difference(partition_tables)
        .cloned()
        .collect();

    if unknown_tables.is_empty() {
        return Ok(());
    }

    Err(Error::UnknownPartitionTables {
        tables: unknown_tables,
    })
}

/// Worker that creates indexes on partitions from the queue.
async fn worker(
    config: std::sync::Arc<crate::Config>,
    queue: std::sync::Arc<tokio::sync::Mutex<std::collections::VecDeque<Partition>>>,
) -> core::result::Result<(), Error> {
    Ok(config
        .as_ref()
        .with_sqlx_connection(async move |connection| {
            loop {
                let partition = queue.lock().await.pop_front();

                let Some(partition) = partition else {
                    break;
                };

                log::info!(
                    "Creating index {} on {}",
                    partition.index,
                    partition.qualified_table
                );

                sqlx::raw_sql(partition.create_index_statement.clone())
                    .execute(&mut *connection)
                    .await?;

                log::info!(
                    "Created index {} on {}",
                    partition.index,
                    partition.qualified_table
                );
            }

            Ok::<(), sqlx::Error>(())
        })
        .await??)
}

/// Add an index to a partitioned table and all its partitions.
///
/// This function coordinates the multi-step protocol:
/// 1. Discover partitions from `pg_catalog`
/// 2. `CREATE INDEX [CONCURRENTLY]` on each partition (parallel workers)
/// 3. `CREATE INDEX ON ONLY parent_table` (invalid stub on parent)
/// 4. `ALTER INDEX parent_index ATTACH PARTITION partition_index` for each partition
///
/// If `dry_run` is true, the SQL statements are logged but not executed.
pub async fn run(
    config: &crate::Config,
    input: &Input,
    jobs: NonZeroU16,
    dry_run: bool,
) -> core::result::Result<Result, Error> {
    use std::collections::VecDeque;
    use std::sync::Arc;

    use tokio::sync::Mutex;
    use tokio::task::JoinSet;

    let start = std::time::Instant::now();

    let Statements {
        parent_create,
        partitions,
        ..
    } = fetch_statements(config, input).await?;

    if dry_run {
        for partition in &partitions {
            log::info!("[dry-run] {};", partition.create_index_statement.as_str());
        }
        log::info!("[dry-run] {};", parent_create.as_str());
        for partition in &partitions {
            log::info!("[dry-run] {};", partition.attach_statement.as_str());
        }
    } else {
        let partitions_for_workers = partitions.clone();

        // Step 2: Create indexes on partitions via parallel workers
        let shared_config = Arc::new(config.clone());
        let shared_queue = Arc::new(Mutex::new(VecDeque::from(partitions_for_workers)));
        let mut join_set = JoinSet::new();

        for _ in 0..jobs.get() {
            let worker_config = Arc::clone(&shared_config);
            let worker_queue = Arc::clone(&shared_queue);

            join_set.spawn(async move { worker(worker_config, worker_queue).await });
        }

        while let Some(result) = join_set.join_next().await {
            match result {
                Ok(worker_result) => worker_result?,
                Err(join_error) => return Err(Error::WorkerPanic(join_error)),
            }
        }

        // Steps 3-4: Create parent index stub and attach partition indexes
        config
            .with_sqlx_connection(async |connection| {
                log::info!("Creating parent index {}", input.index);

                sqlx::raw_sql(parent_create.clone())
                    .execute(&mut *connection)
                    .await?;

                log::info!("Created parent index {}", input.index);

                for partition in &partitions {
                    sqlx::raw_sql(partition.attach_statement.clone())
                        .execute(&mut *connection)
                        .await?;
                }

                Ok::<(), sqlx::Error>(())
            })
            .await??;
    }

    Ok(Result {
        elapsed: start.elapsed(),
        partitions,
    })
}