database_replicator/replication/
publication.rs1use anyhow::{bail, Context, Result};
5use tokio_postgres::Client;
6
7use crate::filters::ReplicationFilter;
8use crate::table_rules::TableRuleKind;
9
10pub async fn create_publication(
26 client: &Client,
27 db_name: &str,
28 publication_name: &str,
29 filter: &ReplicationFilter,
30) -> Result<()> {
31 crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
33 format!(
34 "Invalid publication name '{}': must be a valid PostgreSQL identifier",
35 publication_name
36 )
37 })?;
38
39 tracing::info!("Creating publication '{}'...", publication_name);
40
41 if filter.is_empty() {
42 let query = format!(
43 "CREATE PUBLICATION {} FOR ALL TABLES",
44 crate::utils::quote_ident(publication_name)
45 );
46 return execute_publication_query(client, publication_name, &query).await;
47 }
48
49 let tables = crate::migration::list_tables(client).await?;
50
51 let mut plain_tables = Vec::new();
52 let mut predicate_tables = Vec::new();
53
54 for table in tables {
55 let table_identifier = if table.schema == "public" {
57 table.name.clone()
58 } else {
59 format!("{}.{}", table.schema, table.name)
60 };
61
62 if !filter.should_replicate_table(db_name, &table_identifier) {
63 continue;
64 }
65
66 crate::utils::validate_postgres_identifier(&table.schema).with_context(|| {
68 format!(
69 "Invalid schema name '{}' for table '{}': must be a valid PostgreSQL identifier",
70 table.schema, table.name
71 )
72 })?;
73 crate::utils::validate_postgres_identifier(&table.name).with_context(|| {
74 format!(
75 "Invalid table name '{}' in schema '{}': must be a valid PostgreSQL identifier",
76 table.name, table.schema
77 )
78 })?;
79
80 let fq_table = format!("\"{}\".\"{}\"", table.schema, table.name);
81
82 match filter
83 .table_rules()
84 .rule_for_table(db_name, &table.schema, &table.name)
85 {
86 Some(TableRuleKind::SchemaOnly) => {
87 tracing::debug!(
88 "Excluding table '{}' from publication (schema-only)",
89 table_identifier
90 );
91 }
92 Some(TableRuleKind::Predicate(pred)) => {
93 predicate_tables.push((fq_table, pred));
94 }
95 None => {
96 plain_tables.push(fq_table);
97 }
98 }
99 }
100
101 if plain_tables.is_empty() && predicate_tables.is_empty() {
102 bail!(
103 "No tables available for publication '{}' after applying filters and schema-only rules",
104 publication_name
105 );
106 }
107
108 let has_predicates = !predicate_tables.is_empty();
109 let server_version = get_server_version(client).await?;
110 if has_predicates && server_version < 150000 {
111 bail!(
112 "Table-level predicates require PostgreSQL 15+. Detected server version {}.\n\
113 Upgrade the source database or remove --table-filter/--time-filter for logical replication.",
114 server_version
115 );
116 }
117
118 let mut clauses = Vec::new();
119 clauses.extend(plain_tables);
120 clauses.extend(
121 predicate_tables
122 .iter()
123 .map(|(table, predicate)| format!("{} WHERE ({})", table, predicate)),
124 );
125
126 let query = format!(
127 "CREATE PUBLICATION {} FOR TABLE {}",
128 crate::utils::quote_ident(publication_name),
129 clauses.join(", ")
130 );
131
132 execute_publication_query(client, publication_name, &query).await
133}
134
135async fn execute_publication_query(
136 client: &Client,
137 publication_name: &str,
138 query: &str,
139) -> Result<()> {
140 match client.execute(query, &[]).await {
141 Ok(_) => {
142 tracing::info!("✓ Publication '{}' created successfully", publication_name);
143 Ok(())
144 }
145 Err(e) => {
146 let err_str = e.to_string();
147 if err_str.contains("already exists") {
149 tracing::info!("✓ Publication '{}' already exists", publication_name);
150 Ok(())
151 } else if err_str.contains("permission denied") || err_str.contains("must be owner") {
152 anyhow::bail!(
153 "Permission denied: Cannot create publication '{}'.\n\
154 You need superuser or owner privileges on the database.\n\
155 Grant with: GRANT CREATE ON DATABASE <dbname> TO <user>;\n\
156 Error: {}",
157 publication_name,
158 err_str
159 )
160 } else if err_str.contains("wal_level") || err_str.contains("logical replication") {
161 anyhow::bail!(
162 "Logical replication not enabled: Cannot create publication '{}'.\n\
163 The database parameter 'wal_level' must be set to 'logical'.\n\
164 Contact your database administrator to update postgresql.conf:\n\
165 wal_level = logical\n\
166 Error: {}",
167 publication_name,
168 err_str
169 )
170 } else {
171 anyhow::bail!(
172 "Failed to create publication '{}': {}\n\
173 \n\
174 Common causes:\n\
175 - Insufficient privileges (need CREATE privilege on database)\n\
176 - Logical replication not enabled (wal_level must be 'logical')\n\
177 - Database does not support publications",
178 publication_name,
179 err_str
180 )
181 }
182 }
183 }
184}
185
186async fn get_server_version(client: &Client) -> Result<i32> {
187 let row = client
188 .query_one("SHOW server_version_num", &[])
189 .await
190 .context("Failed to query server version")?;
191 let version_str: String = row.get(0);
192 version_str.parse::<i32>().with_context(|| {
193 format!(
194 "Invalid server_version_num '{}'. Expected integer.",
195 version_str
196 )
197 })
198}
199
200pub async fn list_publications(client: &Client) -> Result<Vec<String>> {
202 let rows = client
203 .query("SELECT pubname FROM pg_publication ORDER BY pubname", &[])
204 .await
205 .context("Failed to list publications")?;
206
207 let publications: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
208
209 Ok(publications)
210}
211
212pub async fn drop_publication(client: &Client, publication_name: &str) -> Result<()> {
214 crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
216 format!(
217 "Invalid publication name '{}': must be a valid PostgreSQL identifier",
218 publication_name
219 )
220 })?;
221
222 tracing::info!("Dropping publication '{}'...", publication_name);
223
224 let query = format!(
225 "DROP PUBLICATION IF EXISTS {}",
226 crate::utils::quote_ident(publication_name)
227 );
228
229 client
230 .execute(&query, &[])
231 .await
232 .context(format!("Failed to drop publication '{}'", publication_name))?;
233
234 tracing::info!("✓ Publication '{}' dropped", publication_name);
235 Ok(())
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::postgres::connect;
242
243 #[tokio::test]
244 #[ignore]
245 async fn test_create_and_list_publications() {
246 let url = std::env::var("TEST_SOURCE_URL").unwrap();
247 let client = connect(&url).await.unwrap();
248
249 let pub_name = "test_publication";
250 let db_name = "postgres"; let filter = ReplicationFilter::empty();
252
253 let _ = drop_publication(&client, pub_name).await;
255
256 let result = create_publication(&client, db_name, pub_name, &filter).await;
258 match &result {
259 Ok(_) => println!("✓ Publication created successfully"),
260 Err(e) => {
261 println!("Error creating publication: {:?}", e);
262 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
264 println!("Skipping test - Neon might not support publications on pooler");
265 return;
266 }
267 }
268 }
269 assert!(result.is_ok(), "Failed to create publication");
270
271 let pubs = list_publications(&client).await.unwrap();
273 println!("Publications: {:?}", pubs);
274 assert!(pubs.contains(&pub_name.to_string()));
275
276 drop_publication(&client, pub_name).await.unwrap();
278 }
279
280 #[tokio::test]
281 #[ignore]
282 async fn test_drop_publication() {
283 let url = std::env::var("TEST_SOURCE_URL").unwrap();
284 let client = connect(&url).await.unwrap();
285
286 let pub_name = "test_drop_publication";
287 let db_name = "postgres";
288 let filter = ReplicationFilter::empty();
289
290 create_publication(&client, db_name, pub_name, &filter)
292 .await
293 .unwrap();
294
295 let result = drop_publication(&client, pub_name).await;
297 assert!(result.is_ok());
298
299 let pubs = list_publications(&client).await.unwrap();
301 assert!(!pubs.contains(&pub_name.to_string()));
302 }
303}