database_replicator/migration/
estimation.rs

1// ABOUTME: Database size estimation and replication time prediction
2// ABOUTME: Helps users understand resource requirements before replication
3
4use anyhow::{Context, Result};
5use std::time::Duration;
6use tokio_postgres::Client;
7
8use super::schema::DatabaseInfo;
9use crate::filters::ReplicationFilter;
10
11/// Information about a database's size and estimated replication time
12#[derive(Debug, Clone)]
13pub struct DatabaseSizeInfo {
14    /// Database name
15    pub name: String,
16    /// Size in bytes
17    pub size_bytes: i64,
18    /// Human-readable size (e.g., "15.3 GB")
19    pub size_human: String,
20    /// Estimated replication duration
21    pub estimated_duration: Duration,
22}
23
24/// Estimate database sizes and replication times with filtering support
25///
26/// Queries PostgreSQL for database sizes and calculates estimated replication times
27/// based on typical dump/restore speeds. Uses a conservative estimate of 20 GB/hour
28/// for total replication time (dump + restore).
29///
30/// When table filters are specified, connects to each database to compute the exact
31/// size of filtered tables rather than using the entire database size.
32///
33/// # Arguments
34///
35/// * `source_url` - Connection URL for the source database cluster
36/// * `source_client` - Connected PostgreSQL client to source database
37/// * `databases` - List of databases to estimate
38/// * `filter` - Replication filter for table inclusion/exclusion
39///
40/// # Returns
41///
42/// Returns a vector of `DatabaseSizeInfo` with size and time estimates for each database.
43///
44/// # Errors
45///
46/// This function will return an error if:
47/// - Cannot query database sizes
48/// - Database connection fails
49/// - Cannot connect to individual databases for table filtering
50///
51/// # Examples
52///
53/// ```no_run
54/// # use anyhow::Result;
55/// # use database_replicator::postgres::connect;
56/// # use database_replicator::migration::{list_databases, estimate_database_sizes};
57/// # use database_replicator::filters::ReplicationFilter;
58/// # async fn example() -> Result<()> {
59/// let url = "postgresql://user:pass@localhost:5432/postgres";
60/// let client = connect(url).await?;
61/// let databases = list_databases(&client).await?;
62/// let filter = ReplicationFilter::empty();
63/// let estimates = estimate_database_sizes(url, &client, &databases, &filter).await?;
64///
65/// for estimate in estimates {
66///     println!("{}: {} (~{:?})", estimate.name, estimate.size_human, estimate.estimated_duration);
67/// }
68/// # Ok(())
69/// # }
70/// ```
71pub async fn estimate_database_sizes(
72    source_url: &str,
73    source_client: &Client,
74    databases: &[DatabaseInfo],
75    filter: &ReplicationFilter,
76) -> Result<Vec<DatabaseSizeInfo>> {
77    let mut sizes = Vec::new();
78
79    // Check if table filtering is active
80    let has_table_filter = filter.include_tables().is_some() || filter.exclude_tables().is_some();
81
82    for db in databases {
83        let size_bytes = if has_table_filter {
84            // With table filtering, we need to connect to each database
85            // and sum up only the filtered tables
86            estimate_filtered_database_size(source_url, &db.name, filter).await?
87        } else {
88            // Without table filtering, use the faster pg_database_size()
89            let row = source_client
90                .query_one("SELECT pg_database_size($1::text)", &[&db.name])
91                .await
92                .context(format!("Failed to query size for database '{}'", db.name))?;
93            row.get(0)
94        };
95
96        // Estimate replication time based on typical speeds
97        // Conservative estimates:
98        // - Dump: 25-35 GB/hour
99        // - Restore: 15-25 GB/hour
100        // Combined conservative estimate: 20 GB/hour total
101        let estimated_duration = estimate_replication_duration(size_bytes);
102
103        sizes.push(DatabaseSizeInfo {
104            name: db.name.clone(),
105            size_bytes,
106            size_human: format_bytes(size_bytes),
107            estimated_duration,
108        });
109    }
110
111    Ok(sizes)
112}
113
114/// Estimate database size considering table filters
115///
116/// Connects to the specific database, gets all tables, filters them,
117/// and sums up the sizes of only the filtered tables.
118///
119/// # Arguments
120///
121/// * `source_url` - Connection URL for the source database cluster
122/// * `db_name` - Name of the database to estimate
123/// * `filter` - Replication filter for table inclusion/exclusion
124///
125/// # Returns
126///
127/// Total size in bytes of all filtered tables in the database
128async fn estimate_filtered_database_size(
129    source_url: &str,
130    db_name: &str,
131    filter: &ReplicationFilter,
132) -> Result<i64> {
133    // Build connection URL for this specific database
134    let db_url = replace_database_in_url(source_url, db_name)?;
135    let client = crate::postgres::connect(&db_url).await?;
136
137    // Get all tables in the database
138    let tables = super::schema::list_tables(&client).await?;
139
140    // Filter tables based on filter rules
141    let filtered_tables: Vec<_> = tables
142        .into_iter()
143        .filter(|table| {
144            // Build full table name in "database.table" format for filtering
145            let table_name = if table.schema == "public" {
146                table.name.clone()
147            } else {
148                format!("{}.{}", table.schema, table.name)
149            };
150            filter.should_replicate_table(db_name, &table_name)
151        })
152        .collect();
153
154    // Sum up sizes of filtered tables
155    let mut total_size: i64 = 0;
156    for table in filtered_tables {
157        // Use pg_total_relation_size to include indexes, TOAST, etc.
158        let query = format!(
159            "SELECT pg_total_relation_size('{}.{}'::regclass)",
160            table.schema, table.name
161        );
162
163        let row = client.query_one(&query, &[]).await.context(format!(
164            "Failed to query size for table '{}.{}'",
165            table.schema, table.name
166        ))?;
167
168        let table_size: i64 = row.get(0);
169        total_size += table_size;
170    }
171
172    Ok(total_size)
173}
174
175/// Replace the database name in a connection URL
176///
177/// # Arguments
178///
179/// * `url` - Original connection URL
180/// * `new_database` - New database name to use
181///
182/// # Returns
183///
184/// Updated connection URL with new database name
185fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
186    // Parse URL to find database name
187    // Format: postgresql://user:pass@host:port/database?params
188
189    // Split by '?' to separate params
190    let parts: Vec<&str> = url.split('?').collect();
191    let base_url = parts[0];
192    let params = if parts.len() > 1 {
193        Some(parts[1])
194    } else {
195        None
196    };
197
198    // Split base by '/' to get everything before database name
199    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
200    if url_parts.len() != 2 {
201        anyhow::bail!("Invalid connection URL format");
202    }
203
204    // Reconstruct URL with new database name
205    let mut new_url = format!("{}/{}", url_parts[1], new_database);
206    if let Some(p) = params {
207        new_url = format!("{}?{}", new_url, p);
208    }
209
210    Ok(new_url)
211}
212
213/// Estimate replication duration based on database size
214///
215/// Uses a conservative estimate of 20 GB/hour for total replication time,
216/// which accounts for both dump and restore operations.
217///
218/// # Arguments
219///
220/// * `size_bytes` - Database size in bytes
221///
222/// # Returns
223///
224/// Estimated duration for complete replication (dump + restore)
225fn estimate_replication_duration(size_bytes: i64) -> Duration {
226    // Conservative estimate: 20 GB/hour total (dump + restore)
227    const BYTES_PER_HOUR: f64 = 20.0 * 1024.0 * 1024.0 * 1024.0; // 20 GB
228
229    let hours = size_bytes as f64 / BYTES_PER_HOUR;
230    Duration::from_secs_f64(hours * 3600.0)
231}
232
233/// Format bytes into human-readable string
234///
235/// Converts byte count into appropriate units (B, KB, MB, GB, TB)
236/// with one decimal place of precision.
237///
238/// # Arguments
239///
240/// * `bytes` - Number of bytes to format
241///
242/// # Returns
243///
244/// Human-readable string (e.g., "15.3 GB", "2.1 MB")
245///
246/// # Examples
247///
248/// ```
249/// # use database_replicator::migration::format_bytes;
250/// assert_eq!(format_bytes(1024), "1.0 KB");
251/// assert_eq!(format_bytes(1536), "1.5 KB");
252/// assert_eq!(format_bytes(1073741824), "1.0 GB");
253/// assert_eq!(format_bytes(16106127360), "15.0 GB");
254/// ```
255pub fn format_bytes(bytes: i64) -> String {
256    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
257    let mut size = bytes as f64;
258    let mut unit_idx = 0;
259
260    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
261        size /= 1024.0;
262        unit_idx += 1;
263    }
264
265    format!("{:.1} {}", size, UNITS[unit_idx])
266}
267
268/// Format duration into human-readable string
269///
270/// Converts duration into appropriate units (seconds, minutes, hours, days)
271/// with reasonable precision.
272///
273/// # Arguments
274///
275/// * `duration` - Duration to format
276///
277/// # Returns
278///
279/// Human-readable string (e.g., "~2.5 hours", "~45 minutes", "~3 days")
280///
281/// # Examples
282///
283/// ```
284/// # use std::time::Duration;
285/// # use database_replicator::migration::format_duration;
286/// assert_eq!(format_duration(Duration::from_secs(45)), "~45 seconds");
287/// assert_eq!(format_duration(Duration::from_secs(120)), "~2.0 minutes");
288/// assert_eq!(format_duration(Duration::from_secs(3600)), "~1.0 hours");
289/// assert_eq!(format_duration(Duration::from_secs(7200)), "~2.0 hours");
290/// ```
291pub fn format_duration(duration: Duration) -> String {
292    let secs = duration.as_secs();
293
294    if secs < 60 {
295        format!("~{} seconds", secs)
296    } else if secs < 3600 {
297        let minutes = secs as f64 / 60.0;
298        format!("~{:.1} minutes", minutes)
299    } else if secs < 86400 {
300        let hours = secs as f64 / 3600.0;
301        format!("~{:.1} hours", hours)
302    } else {
303        let days = secs as f64 / 86400.0;
304        format!("~{:.1} days", days)
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn test_format_bytes() {
314        assert_eq!(format_bytes(0), "0.0 B");
315        assert_eq!(format_bytes(500), "500.0 B");
316        assert_eq!(format_bytes(1024), "1.0 KB");
317        assert_eq!(format_bytes(1536), "1.5 KB");
318        assert_eq!(format_bytes(1048576), "1.0 MB");
319        assert_eq!(format_bytes(1073741824), "1.0 GB");
320        assert_eq!(format_bytes(16106127360), "15.0 GB");
321        assert_eq!(format_bytes(1099511627776), "1.0 TB");
322    }
323
324    #[test]
325    fn test_format_duration() {
326        assert_eq!(format_duration(Duration::from_secs(30)), "~30 seconds");
327        assert_eq!(format_duration(Duration::from_secs(59)), "~59 seconds");
328        assert_eq!(format_duration(Duration::from_secs(60)), "~1.0 minutes");
329        assert_eq!(format_duration(Duration::from_secs(120)), "~2.0 minutes");
330        assert_eq!(format_duration(Duration::from_secs(3599)), "~60.0 minutes");
331        assert_eq!(format_duration(Duration::from_secs(3600)), "~1.0 hours");
332        assert_eq!(format_duration(Duration::from_secs(7200)), "~2.0 hours");
333        assert_eq!(format_duration(Duration::from_secs(86400)), "~1.0 days");
334        assert_eq!(format_duration(Duration::from_secs(172800)), "~2.0 days");
335    }
336
337    #[test]
338    fn test_estimate_replication_duration() {
339        // 1 GB should take ~3 minutes (20 GB/hour = 0.05 hours for 1 GB)
340        let duration = estimate_replication_duration(1073741824);
341        assert!(duration.as_secs() >= 170 && duration.as_secs() <= 190);
342
343        // 20 GB should take ~1 hour
344        let duration = estimate_replication_duration(21474836480);
345        assert!(duration.as_secs() >= 3500 && duration.as_secs() <= 3700);
346    }
347}