database_replicator/replication/
subscription.rs

1// ABOUTME: Subscription management for logical replication on target database
2// ABOUTME: Creates and manages PostgreSQL subscriptions to receive replicated data
3
4use anyhow::{Context, Result};
5use std::time::Duration;
6use tokio_postgres::Client;
7
8/// Create a subscription to a publication on the source database
9pub async fn create_subscription(
10    client: &Client,
11    subscription_name: &str,
12    source_connection_string: &str,
13    publication_name: &str,
14) -> Result<()> {
15    // Validate subscription name to prevent SQL injection
16    crate::utils::validate_postgres_identifier(subscription_name).with_context(|| {
17        format!(
18            "Invalid subscription name '{}': must be a valid PostgreSQL identifier",
19            subscription_name
20        )
21    })?;
22
23    // Validate publication name to prevent SQL injection
24    crate::utils::validate_postgres_identifier(publication_name).with_context(|| {
25        format!(
26            "Invalid publication name '{}': must be a valid PostgreSQL identifier",
27            publication_name
28        )
29    })?;
30
31    tracing::info!("Creating subscription '{}'...", subscription_name);
32
33    // SECURITY NOTE: PostgreSQL subscriptions store connection strings (including passwords)
34    // in the pg_subscription system catalog, visible to users with access to that table.
35    //
36    // To avoid storing passwords in the catalog:
37    // 1. Configure .pgpass file on the TARGET PostgreSQL server
38    // 2. Use password-less connection string (omit password from URL)
39    // 3. The subscription will read credentials from .pgpass
40    //
41    // See: https://www.postgresql.org/docs/current/libpq-pgpass.html
42    //
43    // For now, we use the provided connection string as-is for compatibility.
44    // Users concerned about password exposure should configure .pgpass on the target server.
45
46    tracing::warn!(
47        "⚠ Security Note: Subscription connection strings are stored in pg_subscription catalog"
48    );
49    tracing::warn!(
50        "  To avoid storing passwords, configure .pgpass on the target PostgreSQL server"
51    );
52
53    let query = format!(
54        "CREATE SUBSCRIPTION {} CONNECTION {} PUBLICATION {}",
55        crate::utils::quote_ident(subscription_name),
56        crate::utils::quote_literal(source_connection_string),
57        crate::utils::quote_ident(publication_name)
58    );
59
60    match client.execute(&query, &[]).await {
61        Ok(_) => {
62            tracing::info!(
63                "✓ Subscription '{}' created successfully",
64                subscription_name
65            );
66            Ok(())
67        }
68        Err(e) => {
69            let err_str = e.to_string();
70            // Subscription might already exist - that's okay
71            if err_str.contains("already exists") {
72                tracing::info!("✓ Subscription '{}' already exists", subscription_name);
73                Ok(())
74            } else if err_str.contains("permission denied") || err_str.contains("must be superuser")
75            {
76                anyhow::bail!(
77                    "Permission denied: Cannot create subscription '{}'.\n\
78                     Only superusers can create subscriptions in PostgreSQL.\n\
79                     Contact your database administrator to:\n\
80                     1. Grant superuser: ALTER ROLE <user> WITH SUPERUSER;\n\
81                     2. Or create the subscription on your behalf\n\
82                     Error: {}",
83                    subscription_name,
84                    err_str
85                )
86            } else if err_str.contains("publication") && err_str.contains("does not exist") {
87                anyhow::bail!(
88                    "Publication does not exist: Cannot create subscription '{}'.\n\
89                     The publication '{}' was not found on the source database.\n\
90                     Make sure the publication exists before creating the subscription.\n\
91                     Error: {}",
92                    subscription_name,
93                    publication_name,
94                    err_str
95                )
96            } else if err_str.contains("could not connect to the publisher")
97                || err_str.contains("connection")
98            {
99                anyhow::bail!(
100                    "Connection failed: Cannot connect to source database for subscription '{}'.\n\
101                     Please verify:\n\
102                     - The source database is accessible from the target\n\
103                     - The connection string is correct\n\
104                     - Firewall rules allow connections\n\
105                     - The source user has REPLICATION privilege\n\
106                     Error: {}",
107                    subscription_name,
108                    err_str
109                )
110            } else if err_str.contains("replication slot") {
111                anyhow::bail!(
112                    "Replication slot error: Cannot create subscription '{}'.\n\
113                     The source database may have reached the maximum number of replication slots.\n\
114                     Check 'max_replication_slots' on the source database.\n\
115                     Error: {}",
116                    subscription_name,
117                    err_str
118                )
119            } else {
120                anyhow::bail!(
121                    "Failed to create subscription '{}': {}\n\
122                     \n\
123                     Common causes:\n\
124                     - Insufficient privileges (need SUPERUSER on target)\n\
125                     - Publication does not exist on source\n\
126                     - Cannot connect to source database\n\
127                     - max_replication_slots limit reached on source",
128                    subscription_name,
129                    err_str
130                )
131            }
132        }
133    }
134}
135
136/// List all subscriptions in the database
137pub async fn list_subscriptions(client: &Client) -> Result<Vec<String>> {
138    let rows = client
139        .query("SELECT subname FROM pg_subscription ORDER BY subname", &[])
140        .await
141        .context("Failed to list subscriptions")?;
142
143    let subscriptions: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
144
145    Ok(subscriptions)
146}
147
148/// Drop a subscription
149pub async fn drop_subscription(client: &Client, subscription_name: &str) -> Result<()> {
150    // Validate subscription name to prevent SQL injection
151    crate::utils::validate_postgres_identifier(subscription_name).with_context(|| {
152        format!(
153            "Invalid subscription name '{}': must be a valid PostgreSQL identifier",
154            subscription_name
155        )
156    })?;
157
158    tracing::info!("Dropping subscription '{}'...", subscription_name);
159
160    let query = format!(
161        "DROP SUBSCRIPTION IF EXISTS {}",
162        crate::utils::quote_ident(subscription_name)
163    );
164
165    client.execute(&query, &[]).await.context(format!(
166        "Failed to drop subscription '{}'",
167        subscription_name
168    ))?;
169
170    tracing::info!("✓ Subscription '{}' dropped", subscription_name);
171    Ok(())
172}
173
174/// Subscription state enum
175#[derive(Debug, Clone, PartialEq)]
176pub enum SubscriptionState {
177    /// Subscription is streaming changes ('r' state)
178    Streaming,
179    /// Subscription is initializing ('i' state)
180    Initializing,
181    /// Subscription is copying data ('d' state)
182    Copying,
183    /// Subscription is syncing ('s' state)
184    Syncing,
185    /// Subscription has an error or is in unknown state
186    Error(String),
187    /// Subscription does not exist
188    NotFound,
189}
190
191/// Detect the current state of a subscription
192pub async fn detect_subscription_state(
193    client: &Client,
194    subscription_name: &str,
195) -> Result<SubscriptionState> {
196    // Query pg_stat_subscription to get subscription state
197    let rows = client
198        .query(
199            "SELECT srsubstate FROM pg_stat_subscription WHERE subname = $1",
200            &[&subscription_name],
201        )
202        .await
203        .context(format!(
204            "Failed to query subscription status for '{}'",
205            subscription_name
206        ))?;
207
208    if rows.is_empty() {
209        return Ok(SubscriptionState::NotFound);
210    }
211
212    let state: String = rows[0].get(0);
213
214    match state.as_str() {
215        "r" => Ok(SubscriptionState::Streaming),
216        "i" => Ok(SubscriptionState::Initializing),
217        "d" => Ok(SubscriptionState::Copying),
218        "s" => Ok(SubscriptionState::Syncing),
219        other => Ok(SubscriptionState::Error(other.to_string())),
220    }
221}
222
223/// Wait for subscription to complete initial sync and enter streaming state
224/// Returns when subscription reaches 'r' (ready/streaming) state
225pub async fn wait_for_sync(
226    client: &Client,
227    subscription_name: &str,
228    timeout_secs: u64,
229) -> Result<()> {
230    tracing::info!(
231        "Waiting for subscription '{}' to sync...",
232        subscription_name
233    );
234
235    let start = std::time::Instant::now();
236    let timeout = Duration::from_secs(timeout_secs);
237
238    loop {
239        let row = client
240            .query_one(
241                "SELECT srsubstate FROM pg_stat_subscription WHERE subname = $1",
242                &[&subscription_name],
243            )
244            .await
245            .context(format!(
246                "Failed to query subscription status for '{}'",
247                subscription_name
248            ))?;
249
250        let state: String = row.get(0);
251
252        match state.as_str() {
253            "r" => {
254                tracing::info!(
255                    "✓ Subscription '{}' is ready and streaming",
256                    subscription_name
257                );
258                return Ok(());
259            }
260            "i" => {
261                tracing::info!("Subscription '{}' is initializing...", subscription_name);
262            }
263            "d" => {
264                tracing::info!("Subscription '{}' is copying data...", subscription_name);
265            }
266            "s" => {
267                tracing::info!("Subscription '{}' is syncing...", subscription_name);
268            }
269            _ => {
270                tracing::warn!(
271                    "Subscription '{}' in unexpected state: {}",
272                    subscription_name,
273                    state
274                );
275            }
276        }
277
278        if start.elapsed() > timeout {
279            anyhow::bail!(
280                "Timeout waiting for subscription '{}' to sync after {} seconds.\n\
281                 The subscription is in state '{}' and has not reached 'ready' (streaming) state.\n\
282                 \n\
283                 Possible causes:\n\
284                 - Large database taking longer than expected to copy\n\
285                 - Network issues slowing down data transfer\n\
286                 - Source database under heavy load\n\
287                 \n\
288                 Suggestions:\n\
289                 - Increase the timeout value and try again\n\
290                 - Check replication status with 'status' command\n\
291                 - Monitor source database load and network connectivity",
292                subscription_name,
293                timeout_secs,
294                state
295            );
296        }
297
298        tokio::time::sleep(Duration::from_secs(2)).await;
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::postgres::connect;
306
307    #[tokio::test]
308    #[ignore]
309    async fn test_create_and_list_subscriptions() {
310        // This test requires two databases: source and target
311        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
312        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
313
314        let source_client = connect(&source_url).await.unwrap();
315        let target_client = connect(&target_url).await.unwrap();
316
317        let sub_name = "test_subscription";
318        let pub_name = "test_publication";
319        let db_name = "postgres"; // Assume testing on postgres database
320        let filter = crate::filters::ReplicationFilter::empty();
321
322        // Create publication on source
323        crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
324            .await
325            .unwrap();
326
327        // Clean up subscription if exists
328        let _ = drop_subscription(&target_client, sub_name).await;
329
330        // Create subscription on target
331        let result = create_subscription(&target_client, sub_name, &source_url, pub_name).await;
332        match &result {
333            Ok(_) => println!("✓ Subscription created successfully"),
334            Err(e) => {
335                println!("Error creating subscription: {:?}", e);
336                // If target doesn't support subscriptions, skip rest of test
337                if e.to_string().contains("not supported") || e.to_string().contains("permission") {
338                    println!("Skipping test - target might not support subscriptions");
339                    return;
340                }
341            }
342        }
343        assert!(result.is_ok(), "Failed to create subscription");
344
345        // List subscriptions
346        let subs = list_subscriptions(&target_client).await.unwrap();
347        println!("Subscriptions: {:?}", subs);
348        assert!(subs.contains(&sub_name.to_string()));
349
350        // Clean up
351        drop_subscription(&target_client, sub_name).await.unwrap();
352        crate::replication::drop_publication(&source_client, pub_name)
353            .await
354            .unwrap();
355    }
356
357    #[tokio::test]
358    #[ignore]
359    async fn test_drop_subscription() {
360        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
361        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
362
363        let source_client = connect(&source_url).await.unwrap();
364        let target_client = connect(&target_url).await.unwrap();
365
366        let sub_name = "test_drop_subscription";
367        let pub_name = "test_drop_publication";
368        let db_name = "postgres";
369        let filter = crate::filters::ReplicationFilter::empty();
370
371        // Create publication on source
372        crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
373            .await
374            .unwrap();
375
376        // Create subscription on target
377        create_subscription(&target_client, sub_name, &source_url, pub_name)
378            .await
379            .unwrap();
380
381        // Drop it
382        let result = drop_subscription(&target_client, sub_name).await;
383        assert!(result.is_ok());
384
385        // Verify it's gone
386        let subs = list_subscriptions(&target_client).await.unwrap();
387        assert!(!subs.contains(&sub_name.to_string()));
388
389        // Clean up publication
390        crate::replication::drop_publication(&source_client, pub_name)
391            .await
392            .unwrap();
393    }
394
395    #[tokio::test]
396    #[ignore]
397    async fn test_wait_for_sync() {
398        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
399        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
400
401        let source_client = connect(&source_url).await.unwrap();
402        let target_client = connect(&target_url).await.unwrap();
403
404        let sub_name = "test_wait_subscription";
405        let pub_name = "test_wait_publication";
406        let db_name = "postgres";
407        let filter = crate::filters::ReplicationFilter::empty();
408
409        // Create publication on source
410        crate::replication::create_publication(&source_client, db_name, pub_name, &filter)
411            .await
412            .unwrap();
413
414        // Clean up subscription if exists
415        let _ = drop_subscription(&target_client, sub_name).await;
416
417        // Create subscription on target
418        create_subscription(&target_client, sub_name, &source_url, pub_name)
419            .await
420            .unwrap();
421
422        // Wait for sync (30 second timeout)
423        let result = wait_for_sync(&target_client, sub_name, 30).await;
424        assert!(result.is_ok(), "Failed to wait for sync: {:?}", result);
425
426        // Clean up
427        drop_subscription(&target_client, sub_name).await.unwrap();
428        crate::replication::drop_publication(&source_client, pub_name)
429            .await
430            .unwrap();
431    }
432}