database_replicator/replication/
subscription.rs1use anyhow::{Context, Result};
5use std::time::Duration;
6use tokio_postgres::Client;
7
8pub async fn create_subscription(
10 client: &Client,
11 subscription_name: &str,
12 source_connection_string: &str,
13 publication_name: &str,
14) -> Result<()> {
15 crate::utils::validate_postgres_identifier(subscription_name).with_context(|| {
17 format!(
18 "Invalid subscription name '{}': must be a valid PostgreSQL identifier",
19 subscription_name
20 )
21 })?;
22
23 crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
25 format!(
26 "Invalid publication name '{}': must be a valid PostgreSQL identifier",
27 publication_name
28 )
29 })?;
30
31 tracing::info!("Creating subscription '{}'...", subscription_name);
32
33 tracing::warn!(
47 "⚠ Security Note: Subscription connection strings are stored in pg_subscription catalog"
48 );
49 tracing::warn!(
50 " To avoid storing passwords, configure .pgpass on the target PostgreSQL server"
51 );
52
53 let query = format!(
54 "CREATE SUBSCRIPTION {} CONNECTION {} PUBLICATION {}",
55 crate::utils::quote_ident(subscription_name),
56 crate::utils::quote_literal(source_connection_string),
57 crate::utils::quote_ident(publication_name)
58 );
59
60 match client.execute(&query, &[]).await {
61 Ok(_) => {
62 tracing::info!(
63 "✓ Subscription '{}' created successfully",
64 subscription_name
65 );
66 Ok(())
67 }
68 Err(e) => {
69 let err_str = e.to_string();
70 if err_str.contains("already exists") {
72 tracing::info!("✓ Subscription '{}' already exists", subscription_name);
73 Ok(())
74 } else if err_str.contains("permission denied") || err_str.contains("must be superuser")
75 {
76 anyhow::bail!(
77 "Permission denied: Cannot create subscription '{}'.\n\
78 Only superusers can create subscriptions in PostgreSQL.\n\
79 Contact your database administrator to:\n\
80 1. Grant superuser: ALTER ROLE <user> WITH SUPERUSER;\n\
81 2. Or create the subscription on your behalf\n\
82 Error: {}",
83 subscription_name,
84 err_str
85 )
86 } else if err_str.contains("publication") && err_str.contains("does not exist") {
87 anyhow::bail!(
88 "Publication does not exist: Cannot create subscription '{}'.\n\
89 The publication '{}' was not found on the source database.\n\
90 Make sure the publication exists before creating the subscription.\n\
91 Error: {}",
92 subscription_name,
93 publication_name,
94 err_str
95 )
96 } else if err_str.contains("could not connect to the publisher")
97 || err_str.contains("connection")
98 {
99 anyhow::bail!(
100 "Connection failed: Cannot connect to source database for subscription '{}'.\n\
101 Please verify:\n\
102 - The source database is accessible from the target\n\
103 - The connection string is correct\n\
104 - Firewall rules allow connections\n\
105 - The source user has REPLICATION privilege\n\
106 Error: {}",
107 subscription_name,
108 err_str
109 )
110 } else if err_str.contains("replication slot") {
111 anyhow::bail!(
112 "Replication slot error: Cannot create subscription '{}'.\n\
113 The source database may have reached the maximum number of replication slots.\n\
114 Check 'max_replication_slots' on the source database.\n\
115 Error: {}",
116 subscription_name,
117 err_str
118 )
119 } else {
120 anyhow::bail!(
121 "Failed to create subscription '{}': {}\n\
122 \n\
123 Common causes:\n\
124 - Insufficient privileges (need SUPERUSER on target)\n\
125 - Publication does not exist on source\n\
126 - Cannot connect to source database\n\
127 - max_replication_slots limit reached on source",
128 subscription_name,
129 err_str
130 )
131 }
132 }
133 }
134}
135
136pub async fn list_subscriptions(client: &Client) -> Result<Vec<String>> {
138 let rows = client
139 .query("SELECT subname FROM pg_subscription ORDER BY subname", &[])
140 .await
141 .context("Failed to list subscriptions")?;
142
143 let subscriptions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
144
145 Ok(subscriptions)
146}
147
148pub async fn drop_subscription(client: &Client, subscription_name: &str) -> Result<()> {
150 crate::utils::validate_postgres_identifier(subscription_name).with_context(|| {
152 format!(
153 "Invalid subscription name '{}': must be a valid PostgreSQL identifier",
154 subscription_name
155 )
156 })?;
157
158 tracing::info!("Dropping subscription '{}'...", subscription_name);
159
160 let query = format!(
161 "DROP SUBSCRIPTION IF EXISTS {}",
162 crate::utils::quote_ident(subscription_name)
163 );
164
165 client.execute(&query, &[]).await.context(format!(
166 "Failed to drop subscription '{}'",
167 subscription_name
168 ))?;
169
170 tracing::info!("✓ Subscription '{}' dropped", subscription_name);
171 Ok(())
172}
173
174#[derive(Debug, Clone, PartialEq)]
176pub enum SubscriptionState {
177 Streaming,
179 Initializing,
181 Copying,
183 Syncing,
185 Error(String),
187 NotFound,
189}
190
191pub async fn detect_subscription_state(
193 client: &Client,
194 subscription_name: &str,
195) -> Result<SubscriptionState> {
196 let rows = client
198 .query(
199 "SELECT srsubstate FROM pg_stat_subscription WHERE subname = $1",
200 &[&subscription_name],
201 )
202 .await
203 .context(format!(
204 "Failed to query subscription status for '{}'",
205 subscription_name
206 ))?;
207
208 if rows.is_empty() {
209 return Ok(SubscriptionState::NotFound);
210 }
211
212 let state: String = rows[0].get(0);
213
214 match state.as_str() {
215 "r" => Ok(SubscriptionState::Streaming),
216 "i" => Ok(SubscriptionState::Initializing),
217 "d" => Ok(SubscriptionState::Copying),
218 "s" => Ok(SubscriptionState::Syncing),
219 other => Ok(SubscriptionState::Error(other.to_string())),
220 }
221}
222
223pub async fn wait_for_sync(
226 client: &Client,
227 subscription_name: &str,
228 timeout_secs: u64,
229) -> Result<()> {
230 tracing::info!(
231 "Waiting for subscription '{}' to sync...",
232 subscription_name
233 );
234
235 let start = std::time::Instant::now();
236 let timeout = Duration::from_secs(timeout_secs);
237
238 loop {
239 let row = client
240 .query_one(
241 "SELECT srsubstate FROM pg_stat_subscription WHERE subname = $1",
242 &[&subscription_name],
243 )
244 .await
245 .context(format!(
246 "Failed to query subscription status for '{}'",
247 subscription_name
248 ))?;
249
250 let state: String = row.get(0);
251
252 match state.as_str() {
253 "r" => {
254 tracing::info!(
255 "✓ Subscription '{}' is ready and streaming",
256 subscription_name
257 );
258 return Ok(());
259 }
260 "i" => {
261 tracing::info!("Subscription '{}' is initializing...", subscription_name);
262 }
263 "d" => {
264 tracing::info!("Subscription '{}' is copying data...", subscription_name);
265 }
266 "s" => {
267 tracing::info!("Subscription '{}' is syncing...", subscription_name);
268 }
269 _ => {
270 tracing::warn!(
271 "Subscription '{}' in unexpected state: {}",
272 subscription_name,
273 state
274 );
275 }
276 }
277
278 if start.elapsed() > timeout {
279 anyhow::bail!(
280 "Timeout waiting for subscription '{}' to sync after {} seconds.\n\
281 The subscription is in state '{}' and has not reached 'ready' (streaming) state.\n\
282 \n\
283 Possible causes:\n\
284 - Large database taking longer than expected to copy\n\
285 - Network issues slowing down data transfer\n\
286 - Source database under heavy load\n\
287 \n\
288 Suggestions:\n\
289 - Increase the timeout value and try again\n\
290 - Check replication status with 'status' command\n\
291 - Monitor source database load and network connectivity",
292 subscription_name,
293 timeout_secs,
294 state
295 );
296 }
297
298 tokio::time::sleep(Duration::from_secs(2)).await;
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::postgres::connect;
306
307 #[tokio::test]
308 #[ignore]
309 async fn test_create_and_list_subscriptions() {
310 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
312 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
313
314 let source_client = connect(&source_url).await.unwrap();
315 let target_client = connect(&target_url).await.unwrap();
316
317 let sub_name = "test_subscription";
318 let pub_name = "test_publication";
319 let db_name = "postgres"; let filter = crate::filters::ReplicationFilter::empty();
321
322 crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
324 .await
325 .unwrap();
326
327 let _ = drop_subscription(&target_client, sub_name).await;
329
330 let result = create_subscription(&target_client, sub_name, &source_url, pub_name).await;
332 match &result {
333 Ok(_) => println!("✓ Subscription created successfully"),
334 Err(e) => {
335 println!("Error creating subscription: {:?}", e);
336 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
338 println!("Skipping test - target might not support subscriptions");
339 return;
340 }
341 }
342 }
343 assert!(result.is_ok(), "Failed to create subscription");
344
345 let subs = list_subscriptions(&target_client).await.unwrap();
347 println!("Subscriptions: {:?}", subs);
348 assert!(subs.contains(&sub_name.to_string()));
349
350 drop_subscription(&target_client, sub_name).await.unwrap();
352 crate::replication::drop_publication(&source_client, pub_name)
353 .await
354 .unwrap();
355 }
356
357 #[tokio::test]
358 #[ignore]
359 async fn test_drop_subscription() {
360 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
361 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
362
363 let source_client = connect(&source_url).await.unwrap();
364 let target_client = connect(&target_url).await.unwrap();
365
366 let sub_name = "test_drop_subscription";
367 let pub_name = "test_drop_publication";
368 let db_name = "postgres";
369 let filter = crate::filters::ReplicationFilter::empty();
370
371 crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
373 .await
374 .unwrap();
375
376 create_subscription(&target_client, sub_name, &source_url, pub_name)
378 .await
379 .unwrap();
380
381 let result = drop_subscription(&target_client, sub_name).await;
383 assert!(result.is_ok());
384
385 let subs = list_subscriptions(&target_client).await.unwrap();
387 assert!(!subs.contains(&sub_name.to_string()));
388
389 crate::replication::drop_publication(&source_client, pub_name)
391 .await
392 .unwrap();
393 }
394
395 #[tokio::test]
396 #[ignore]
397 async fn test_wait_for_sync() {
398 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
399 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
400
401 let source_client = connect(&source_url).await.unwrap();
402 let target_client = connect(&target_url).await.unwrap();
403
404 let sub_name = "test_wait_subscription";
405 let pub_name = "test_wait_publication";
406 let db_name = "postgres";
407 let filter = crate::filters::ReplicationFilter::empty();
408
409 crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
411 .await
412 .unwrap();
413
414 let _ = drop_subscription(&target_client, sub_name).await;
416
417 create_subscription(&target_client, sub_name, &source_url, pub_name)
419 .await
420 .unwrap();
421
422 let result = wait_for_sync(&target_client, sub_name, 30).await;
424 assert!(result.is_ok(), "Failed to wait for sync: {:?}", result);
425
426 drop_subscription(&target_client, sub_name).await.unwrap();
428 crate::replication::drop_publication(&source_client, pub_name)
429 .await
430 .unwrap();
431 }
432}