database_replicator/migration/estimation.rs
1// ABOUTME: Database size estimation and replication time prediction
2// ABOUTME: Helps users understand resource requirements before replication
3
4use anyhow::{Context, Result};
5use std::time::Duration;
6use tokio_postgres::Client;
7
8use super::schema::DatabaseInfo;
9use crate::filters::ReplicationFilter;
10
11/// Information about a database's size and estimated replication time
12#[derive(Debug, Clone)]
13pub struct DatabaseSizeInfo {
14 /// Database name
15 pub name: String,
16 /// Size in bytes
17 pub size_bytes: i64,
18 /// Human-readable size (e.g., "15.3 GB")
19 pub size_human: String,
20 /// Estimated replication duration
21 pub estimated_duration: Duration,
22}
23
24/// Estimate database sizes and replication times with filtering support
25///
26/// Queries PostgreSQL for database sizes and calculates estimated replication times
27/// based on typical dump/restore speeds. Uses a conservative estimate of 20 GB/hour
28/// for total replication time (dump + restore).
29///
30/// When table filters are specified, connects to each database to compute the exact
31/// size of filtered tables rather than using the entire database size.
32///
33/// # Arguments
34///
35/// * `source_url` - Connection URL for the source database cluster
36/// * `source_client` - Connected PostgreSQL client to source database
37/// * `databases` - List of databases to estimate
38/// * `filter` - Replication filter for table inclusion/exclusion
39///
40/// # Returns
41///
42/// Returns a vector of `DatabaseSizeInfo` with size and time estimates for each database.
43///
44/// # Errors
45///
46/// This function will return an error if:
47/// - Cannot query database sizes
48/// - Database connection fails
49/// - Cannot connect to individual databases for table filtering
50///
51/// # Examples
52///
53/// ```no_run
54/// # use anyhow::Result;
55/// # use database_replicator::postgres::connect;
56/// # use database_replicator::migration::{list_databases, estimate_database_sizes};
57/// # use database_replicator::filters::ReplicationFilter;
58/// # async fn example() -> Result<()> {
59/// let url = "postgresql://user:pass@localhost:5432/postgres";
60/// let client = connect(url).await?;
61/// let databases = list_databases(&client).await?;
62/// let filter = ReplicationFilter::empty();
63/// let estimates = estimate_database_sizes(url, &client, &databases, &filter).await?;
64///
65/// for estimate in estimates {
66/// println!("{}: {} (~{:?})", estimate.name, estimate.size_human, estimate.estimated_duration);
67/// }
68/// # Ok(())
69/// # }
70/// ```
71pub async fn estimate_database_sizes(
72 source_url: &str,
73 source_client: &Client,
74 databases: &[DatabaseInfo],
75 filter: &ReplicationFilter,
76) -> Result<Vec<DatabaseSizeInfo>> {
77 let mut sizes = Vec::new();
78
79 // Check if table filtering is active
80 let has_table_filter = filter.include_tables().is_some() || filter.exclude_tables().is_some();
81
82 for db in databases {
83 let size_bytes = if has_table_filter {
84 // With table filtering, we need to connect to each database
85 // and sum up only the filtered tables
86 estimate_filtered_database_size(source_url, &db.name, filter).await?
87 } else {
88 // Without table filtering, use the faster pg_database_size()
89 let row = source_client
90 .query_one("SELECT pg_database_size($1::text)", &[&db.name])
91 .await
92 .context(format!("Failed to query size for database '{}'", db.name))?;
93 row.get(0)
94 };
95
96 // Estimate replication time based on typical speeds
97 // Conservative estimates:
98 // - Dump: 25-35 GB/hour
99 // - Restore: 15-25 GB/hour
100 // Combined conservative estimate: 20 GB/hour total
101 let estimated_duration = estimate_replication_duration(size_bytes);
102
103 sizes.push(DatabaseSizeInfo {
104 name: db.name.clone(),
105 size_bytes,
106 size_human: format_bytes(size_bytes),
107 estimated_duration,
108 });
109 }
110
111 Ok(sizes)
112}
113
114/// Estimate database size considering table filters
115///
116/// Connects to the specific database, gets all tables, filters them,
117/// and sums up the sizes of only the filtered tables.
118///
119/// # Arguments
120///
121/// * `source_url` - Connection URL for the source database cluster
122/// * `db_name` - Name of the database to estimate
123/// * `filter` - Replication filter for table inclusion/exclusion
124///
125/// # Returns
126///
127/// Total size in bytes of all filtered tables in the database
128async fn estimate_filtered_database_size(
129 source_url: &str,
130 db_name: &str,
131 filter: &ReplicationFilter,
132) -> Result<i64> {
133 // Build connection URL for this specific database
134 let db_url = replace_database_in_url(source_url, db_name)?;
135 let client = crate::postgres::connect(&db_url).await?;
136
137 // Get all tables in the database
138 let tables = super::schema::list_tables(&client).await?;
139
140 // Filter tables based on filter rules
141 let filtered_tables: Vec<_> = tables
142 .into_iter()
143 .filter(|table| {
144 // Build full table name in "database.table" format for filtering
145 let table_name = if table.schema == "public" {
146 table.name.clone()
147 } else {
148 format!("{}.{}", table.schema, table.name)
149 };
150 filter.should_replicate_table(db_name, &table_name)
151 })
152 .collect();
153
154 // Sum up sizes of filtered tables
155 let mut total_size: i64 = 0;
156 for table in filtered_tables {
157 // Use pg_total_relation_size to include indexes, TOAST, etc.
158 let query = format!(
159 "SELECT pg_total_relation_size('{}.{}'::regclass)",
160 table.schema, table.name
161 );
162
163 let row = client.query_one(&query, &[]).await.context(format!(
164 "Failed to query size for table '{}.{}'",
165 table.schema, table.name
166 ))?;
167
168 let table_size: i64 = row.get(0);
169 total_size += table_size;
170 }
171
172 Ok(total_size)
173}
174
175/// Replace the database name in a connection URL
176///
177/// # Arguments
178///
179/// * `url` - Original connection URL
180/// * `new_database` - New database name to use
181///
182/// # Returns
183///
184/// Updated connection URL with new database name
185fn replace_database_in_url(url: &str, new_database: &str) -> Result<String> {
186 // Parse URL to find database name
187 // Format: postgresql://user:pass@host:port/database?params
188
189 // Split by '?' to separate params
190 let parts: Vec<&str> = url.split('?').collect();
191 let base_url = parts[0];
192 let params = if parts.len() > 1 {
193 Some(parts[1])
194 } else {
195 None
196 };
197
198 // Split base by '/' to get everything before database name
199 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
200 if url_parts.len() != 2 {
201 anyhow::bail!("Invalid connection URL format");
202 }
203
204 // Reconstruct URL with new database name
205 let mut new_url = format!("{}/{}", url_parts[1], new_database);
206 if let Some(p) = params {
207 new_url = format!("{}?{}", new_url, p);
208 }
209
210 Ok(new_url)
211}
212
213/// Estimate replication duration based on database size
214///
215/// Uses a conservative estimate of 20 GB/hour for total replication time,
216/// which accounts for both dump and restore operations.
217///
218/// # Arguments
219///
220/// * `size_bytes` - Database size in bytes
221///
222/// # Returns
223///
224/// Estimated duration for complete replication (dump + restore)
225fn estimate_replication_duration(size_bytes: i64) -> Duration {
226 // Conservative estimate: 20 GB/hour total (dump + restore)
227 const BYTES_PER_HOUR: f64 = 20.0 * 1024.0 * 1024.0 * 1024.0; // 20 GB
228
229 let hours = size_bytes as f64 / BYTES_PER_HOUR;
230 Duration::from_secs_f64(hours * 3600.0)
231}
232
233/// Format bytes into human-readable string
234///
235/// Converts byte count into appropriate units (B, KB, MB, GB, TB)
236/// with one decimal place of precision.
237///
238/// # Arguments
239///
240/// * `bytes` - Number of bytes to format
241///
242/// # Returns
243///
244/// Human-readable string (e.g., "15.3 GB", "2.1 MB")
245///
246/// # Examples
247///
248/// ```
249/// # use database_replicator::migration::format_bytes;
250/// assert_eq!(format_bytes(1024), "1.0 KB");
251/// assert_eq!(format_bytes(1536), "1.5 KB");
252/// assert_eq!(format_bytes(1073741824), "1.0 GB");
253/// assert_eq!(format_bytes(16106127360), "15.0 GB");
254/// ```
255pub fn format_bytes(bytes: i64) -> String {
256 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
257 let mut size = bytes as f64;
258 let mut unit_idx = 0;
259
260 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
261 size /= 1024.0;
262 unit_idx += 1;
263 }
264
265 format!("{:.1} {}", size, UNITS[unit_idx])
266}
267
268/// Format duration into human-readable string
269///
270/// Converts duration into appropriate units (seconds, minutes, hours, days)
271/// with reasonable precision.
272///
273/// # Arguments
274///
275/// * `duration` - Duration to format
276///
277/// # Returns
278///
279/// Human-readable string (e.g., "~2.5 hours", "~45 minutes", "~3 days")
280///
281/// # Examples
282///
283/// ```
284/// # use std::time::Duration;
285/// # use database_replicator::migration::format_duration;
286/// assert_eq!(format_duration(Duration::from_secs(45)), "~45 seconds");
287/// assert_eq!(format_duration(Duration::from_secs(120)), "~2.0 minutes");
288/// assert_eq!(format_duration(Duration::from_secs(3600)), "~1.0 hours");
289/// assert_eq!(format_duration(Duration::from_secs(7200)), "~2.0 hours");
290/// ```
291pub fn format_duration(duration: Duration) -> String {
292 let secs = duration.as_secs();
293
294 if secs < 60 {
295 format!("~{} seconds", secs)
296 } else if secs < 3600 {
297 let minutes = secs as f64 / 60.0;
298 format!("~{:.1} minutes", minutes)
299 } else if secs < 86400 {
300 let hours = secs as f64 / 3600.0;
301 format!("~{:.1} hours", hours)
302 } else {
303 let days = secs as f64 / 86400.0;
304 format!("~{:.1} days", days)
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use super::*;
311
312 #[test]
313 fn test_format_bytes() {
314 assert_eq!(format_bytes(0), "0.0 B");
315 assert_eq!(format_bytes(500), "500.0 B");
316 assert_eq!(format_bytes(1024), "1.0 KB");
317 assert_eq!(format_bytes(1536), "1.5 KB");
318 assert_eq!(format_bytes(1048576), "1.0 MB");
319 assert_eq!(format_bytes(1073741824), "1.0 GB");
320 assert_eq!(format_bytes(16106127360), "15.0 GB");
321 assert_eq!(format_bytes(1099511627776), "1.0 TB");
322 }
323
324 #[test]
325 fn test_format_duration() {
326 assert_eq!(format_duration(Duration::from_secs(30)), "~30 seconds");
327 assert_eq!(format_duration(Duration::from_secs(59)), "~59 seconds");
328 assert_eq!(format_duration(Duration::from_secs(60)), "~1.0 minutes");
329 assert_eq!(format_duration(Duration::from_secs(120)), "~2.0 minutes");
330 assert_eq!(format_duration(Duration::from_secs(3599)), "~60.0 minutes");
331 assert_eq!(format_duration(Duration::from_secs(3600)), "~1.0 hours");
332 assert_eq!(format_duration(Duration::from_secs(7200)), "~2.0 hours");
333 assert_eq!(format_duration(Duration::from_secs(86400)), "~1.0 days");
334 assert_eq!(format_duration(Duration::from_secs(172800)), "~2.0 days");
335 }
336
337 #[test]
338 fn test_estimate_replication_duration() {
339 // 1 GB should take ~3 minutes (20 GB/hour = 0.05 hours for 1 GB)
340 let duration = estimate_replication_duration(1073741824);
341 assert!(duration.as_secs() >= 170 && duration.as_secs() <= 190);
342
343 // 20 GB should take ~1 hour
344 let duration = estimate_replication_duration(21474836480);
345 assert!(duration.as_secs() >= 3500 && duration.as_secs() <= 3700);
346 }
347}