database_replicator/replication/
publication.rs

1// ABOUTME: Publication management for logical replication on source database
2// ABOUTME: Creates and manages PostgreSQL publications for table replication
3
4use anyhow::{bail, Context, Result};
5use tokio_postgres::Client;
6
7use crate::filters::ReplicationFilter;
8use crate::table_rules::TableRuleKind;
9
10/// Create a publication for tables with optional filtering
11///
12/// When table filters are specified, creates a publication for only the filtered tables.
13/// Without filters, creates a publication for all tables.
14///
15/// # Arguments
16///
17/// * `client` - Connected client to the database
18/// * `db_name` - Name of the database (for filtering context)
19/// * `publication_name` - Name of the publication to create
20/// * `filter` - Replication filter for table inclusion/exclusion
21///
22/// # Returns
23///
24/// Returns `Ok(())` if publication is created or already exists
25pub async fn create_publication(
26    client: &Client,
27    db_name: &str,
28    publication_name: &str,
29    filter: &ReplicationFilter,
30) -> Result<()> {
31    // Validate publication name to prevent SQL injection
32    crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
33        format!(
34            "Invalid publication name '{}': must be a valid PostgreSQL identifier",
35            publication_name
36        )
37    })?;
38
39    tracing::info!("Creating publication '{}'...", publication_name);
40
41    if filter.is_empty() {
42        let query = format!(
43            "CREATE PUBLICATION {} FOR ALL TABLES",
44            crate::utils::quote_ident(publication_name)
45        );
46        return execute_publication_query(client, publication_name, &query).await;
47    }
48
49    let tables = crate::migration::list_tables(client).await?;
50
51    let mut plain_tables = Vec::new();
52    let mut predicate_tables = Vec::new();
53
54    for table in tables {
55        // Build "schema.table" identifier for include/exclude logic
56        let table_identifier = if table.schema == "public" {
57            table.name.clone()
58        } else {
59            format!("{}.{}", table.schema, table.name)
60        };
61
62        if !filter.should_replicate_table(db_name, &table_identifier) {
63            continue;
64        }
65
66        // Validate schema/table names
67        crate::utils::validate_postgres_identifier(&table.schema).with_context(|| {
68            format!(
69                "Invalid schema name '{}' for table '{}': must be a valid PostgreSQL identifier",
70                table.schema, table.name
71            )
72        })?;
73        crate::utils::validate_postgres_identifier(&table.name).with_context(|| {
74            format!(
75                "Invalid table name '{}' in schema '{}': must be a valid PostgreSQL identifier",
76                table.name, table.schema
77            )
78        })?;
79
80        let fq_table = format!("\"{}\".\"{}\"", table.schema, table.name);
81
82        match filter
83            .table_rules()
84            .rule_for_table(db_name, &table.schema, &table.name)
85        {
86            Some(TableRuleKind::SchemaOnly) => {
87                tracing::debug!(
88                    "Excluding table '{}' from publication (schema-only)",
89                    table_identifier
90                );
91            }
92            Some(TableRuleKind::Predicate(pred)) => {
93                predicate_tables.push((fq_table, pred));
94            }
95            None => {
96                plain_tables.push(fq_table);
97            }
98        }
99    }
100
101    if plain_tables.is_empty() && predicate_tables.is_empty() {
102        bail!(
103            "No tables available for publication '{}' after applying filters and schema-only rules",
104            publication_name
105        );
106    }
107
108    let has_predicates = !predicate_tables.is_empty();
109    let server_version = get_server_version(client).await?;
110    if has_predicates && server_version < 150000 {
111        bail!(
112            "Table-level predicates require PostgreSQL 15+. Detected server version {}.\n\
113             Upgrade the source database or remove --table-filter/--time-filter for logical replication.",
114            server_version
115        );
116    }
117
118    let mut clauses = Vec::new();
119    clauses.extend(plain_tables);
120    clauses.extend(
121        predicate_tables
122            .iter()
123            .map(|(table, predicate)| format!("{} WHERE ({})", table, predicate)),
124    );
125
126    let query = format!(
127        "CREATE PUBLICATION {} FOR TABLE {}",
128        crate::utils::quote_ident(publication_name),
129        clauses.join(", ")
130    );
131
132    execute_publication_query(client, publication_name, &query).await
133}
134
135async fn execute_publication_query(
136    client: &Client,
137    publication_name: &str,
138    query: &str,
139) -> Result<()> {
140    match client.execute(query, &[]).await {
141        Ok(_) => {
142            tracing::info!("✓ Publication '{}' created successfully", publication_name);
143            Ok(())
144        }
145        Err(e) => {
146            let err_str = e.to_string();
147            // Publication might already exist - that's okay
148            if err_str.contains("already exists") {
149                tracing::info!("✓ Publication '{}' already exists", publication_name);
150                Ok(())
151            } else if err_str.contains("permission denied") || err_str.contains("must be owner") {
152                anyhow::bail!(
153                    "Permission denied: Cannot create publication '{}'.\n\
154                     You need superuser or owner privileges on the database.\n\
155                     Grant with: GRANT CREATE ON DATABASE <dbname> TO <user>;\n\
156                     Error: {}",
157                    publication_name,
158                    err_str
159                )
160            } else if err_str.contains("wal_level") || err_str.contains("logical replication") {
161                anyhow::bail!(
162                    "Logical replication not enabled: Cannot create publication '{}'.\n\
163                     The database parameter 'wal_level' must be set to 'logical'.\n\
164                     Contact your database administrator to update postgresql.conf:\n\
165                     wal_level = logical\n\
166                     Error: {}",
167                    publication_name,
168                    err_str
169                )
170            } else {
171                anyhow::bail!(
172                    "Failed to create publication '{}': {}\n\
173                     \n\
174                     Common causes:\n\
175                     - Insufficient privileges (need CREATE privilege on database)\n\
176                     - Logical replication not enabled (wal_level must be 'logical')\n\
177                     - Database does not support publications",
178                    publication_name,
179                    err_str
180                )
181            }
182        }
183    }
184}
185
186async fn get_server_version(client: &Client) -> Result<i32> {
187    let row = client
188        .query_one("SHOW server_version_num", &[])
189        .await
190        .context("Failed to query server version")?;
191    let version_str: String = row.get(0);
192    version_str.parse::<i32>().with_context(|| {
193        format!(
194            "Invalid server_version_num '{}'. Expected integer.",
195            version_str
196        )
197    })
198}
199
200/// List all publications in the database
201pub async fn list_publications(client: &Client) -> Result<Vec<String>> {
202    let rows = client
203        .query("SELECT pubname FROM pg_publication ORDER BY pubname", &[])
204        .await
205        .context("Failed to list publications")?;
206
207    let publications: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
208
209    Ok(publications)
210}
211
212/// Drop a publication
213pub async fn drop_publication(client: &Client, publication_name: &str) -> Result<()> {
214    // Validate publication name to prevent SQL injection
215    crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
216        format!(
217            "Invalid publication name '{}': must be a valid PostgreSQL identifier",
218            publication_name
219        )
220    })?;
221
222    tracing::info!("Dropping publication '{}'...", publication_name);
223
224    let query = format!(
225        "DROP PUBLICATION IF EXISTS {}",
226        crate::utils::quote_ident(publication_name)
227    );
228
229    client
230        .execute(&query, &[])
231        .await
232        .context(format!("Failed to drop publication '{}'", publication_name))?;
233
234    tracing::info!("✓ Publication '{}' dropped", publication_name);
235    Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::postgres::connect;
242
243    #[tokio::test]
244    #[ignore]
245    async fn test_create_and_list_publications() {
246        let url = std::env::var("TEST_SOURCE_URL").unwrap();
247        let client = connect(&url).await.unwrap();
248
249        let pub_name = "test_publication";
250        let db_name = "postgres"; // Assume testing on postgres database
251        let filter = ReplicationFilter::empty();
252
253        // Clean up if exists
254        let _ = drop_publication(&client, pub_name).await;
255
256        // Create publication
257        let result = create_publication(&client, db_name, pub_name, &filter).await;
258        match &result {
259            Ok(_) => println!("✓ Publication created successfully"),
260            Err(e) => {
261                println!("Error creating publication: {:?}", e);
262                // If Neon doesn't support publications, skip rest of test
263                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
264                    println!("Skipping test - Neon might not support publications on pooler");
265                    return;
266                }
267            }
268        }
269        assert!(result.is_ok(), "Failed to create publication");
270
271        // List publications
272        let pubs = list_publications(&client).await.unwrap();
273        println!("Publications: {:?}", pubs);
274        assert!(pubs.contains(&pub_name.to_string()));
275
276        // Clean up
277        drop_publication(&client, pub_name).await.unwrap();
278    }
279
280    #[tokio::test]
281    #[ignore]
282    async fn test_drop_publication() {
283        let url = std::env::var("TEST_SOURCE_URL").unwrap();
284        let client = connect(&url).await.unwrap();
285
286        let pub_name = "test_drop_publication";
287        let db_name = "postgres";
288        let filter = ReplicationFilter::empty();
289
290        // Create publication
291        create_publication(&client, db_name, pub_name, &filter)
292            .await
293            .unwrap();
294
295        // Drop it
296        let result = drop_publication(&client, pub_name).await;
297        assert!(result.is_ok());
298
299        // Verify it's gone
300        let pubs = list_publications(&client).await.unwrap();
301        assert!(!pubs.contains(&pub_name.to_string()));
302    }
303}