Skip to main content

dataprof_db/
streaming.rs

1//! Streaming utilities for processing large database result sets
2
3use crate::DataProfilerError;
4use std::collections::HashMap;
5
6/// Configuration for streaming database results
7#[derive(Debug, Clone)]
8pub struct StreamingConfig {
9    pub batch_size: usize,
10    pub max_memory_mb: usize,
11    pub progress_callback: Option<fn(u64, u64)>,
12}
13
14impl Default for StreamingConfig {
15    fn default() -> Self {
16        Self {
17            batch_size: 10000,
18            max_memory_mb: 512,
19            progress_callback: None,
20        }
21    }
22}
23
24/// Utility to merge multiple batches of column data
25pub fn merge_column_batches(
26    batches: Vec<HashMap<String, Vec<String>>>,
27) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
28    if batches.is_empty() {
29        return Ok(HashMap::new());
30    }
31
32    let mut merged: HashMap<String, Vec<String>> = HashMap::new();
33
34    for batch in batches {
35        for (column_name, column_data) in batch {
36            merged.entry(column_name).or_default().extend(column_data);
37        }
38    }
39
40    Ok(merged)
41}
42
43/// Calculate memory usage of column data (rough estimate)
44pub fn estimate_memory_usage(columns: &HashMap<String, Vec<String>>) -> usize {
45    columns
46        .iter()
47        .map(|(name, data)| name.len() + data.iter().map(|s| s.len()).sum::<usize>())
48        .sum::<usize>()
49}
50
51/// Sample large datasets to fit within memory constraints
52pub fn apply_sampling_if_needed(
53    mut columns: HashMap<String, Vec<String>>,
54    max_memory_mb: usize,
55    sampling_ratio: f64,
56) -> Result<(HashMap<String, Vec<String>>, bool), DataProfilerError> {
57    let memory_usage_bytes = estimate_memory_usage(&columns);
58    let memory_usage_mb = memory_usage_bytes / 1_048_576;
59
60    if memory_usage_mb <= max_memory_mb {
61        return Ok((columns, false));
62    }
63
64    let total_rows = columns.values().next().map(|v| v.len()).unwrap_or(0);
65    let target_rows = (total_rows as f64 * sampling_ratio) as usize;
66
67    if target_rows == 0 {
68        return Ok((HashMap::new(), true));
69    }
70
71    let step = total_rows / target_rows;
72    if step <= 1 {
73        return Ok((columns, false));
74    }
75
76    for column_data in columns.values_mut() {
77        let sampled: Vec<String> = column_data
78            .iter()
79            .step_by(step)
80            .take(target_rows)
81            .cloned()
82            .collect();
83        *column_data = sampled;
84    }
85
86    Ok((columns, true))
87}
88
89/// Progress tracking for streaming operations
90pub struct StreamingProgress {
91    pub total_rows: Option<u64>,
92    pub processed_rows: u64,
93    pub batches_processed: u64,
94    pub start_time: std::time::Instant,
95}
96
97impl StreamingProgress {
98    pub fn new(total_rows: Option<u64>) -> Self {
99        Self {
100            total_rows,
101            processed_rows: 0,
102            batches_processed: 0,
103            start_time: std::time::Instant::now(),
104        }
105    }
106
107    pub fn update(&mut self, batch_size: u64) {
108        self.processed_rows += batch_size;
109        self.batches_processed += 1;
110    }
111
112    pub fn percentage(&self) -> Option<f64> {
113        self.total_rows.map(|total| {
114            if total == 0 {
115                100.0
116            } else {
117                (self.processed_rows as f64 / total as f64) * 100.0
118            }
119        })
120    }
121
122    pub fn elapsed(&self) -> std::time::Duration {
123        self.start_time.elapsed()
124    }
125
126    pub fn estimated_total_time(&self) -> Option<std::time::Duration> {
127        if let Some(percentage) = self.percentage()
128            && percentage > 0.0
129        {
130            let elapsed_secs = self.elapsed().as_secs_f64();
131            let total_secs = elapsed_secs * (100.0 / percentage);
132            return Some(std::time::Duration::from_secs_f64(total_secs));
133        }
134        None
135    }
136
137    pub fn rows_per_second(&self) -> f64 {
138        let elapsed_secs = self.elapsed().as_secs_f64();
139        if elapsed_secs > 0.0 {
140            self.processed_rows as f64 / elapsed_secs
141        } else {
142            0.0
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_merge_column_batches() {
153        let batch1 = {
154            let mut map = HashMap::new();
155            map.insert("col1".to_string(), vec!["a".to_string(), "b".to_string()]);
156            map.insert("col2".to_string(), vec!["1".to_string(), "2".to_string()]);
157            map
158        };
159
160        let batch2 = {
161            let mut map = HashMap::new();
162            map.insert("col1".to_string(), vec!["c".to_string(), "d".to_string()]);
163            map.insert("col2".to_string(), vec!["3".to_string(), "4".to_string()]);
164            map
165        };
166
167        let merged = merge_column_batches(vec![batch1, batch2]).expect("Failed to merge batches");
168
169        assert_eq!(
170            merged.get("col1").expect("col1 not found"),
171            &vec!["a", "b", "c", "d"]
172        );
173        assert_eq!(
174            merged.get("col2").expect("col2 not found"),
175            &vec!["1", "2", "3", "4"]
176        );
177    }
178
179    #[test]
180    fn test_memory_estimation() {
181        let mut columns = HashMap::new();
182        columns.insert(
183            "test".to_string(),
184            vec!["hello".to_string(), "world".to_string()],
185        );
186
187        let memory = estimate_memory_usage(&columns);
188        assert_eq!(memory, 14);
189    }
190
191    #[test]
192    fn test_streaming_progress() {
193        let mut progress = StreamingProgress::new(Some(1000));
194
195        assert_eq!(progress.percentage(), Some(0.0));
196
197        progress.update(250);
198        assert_eq!(progress.percentage(), Some(25.0));
199
200        progress.update(250);
201        assert_eq!(progress.percentage(), Some(50.0));
202
203        assert_eq!(progress.batches_processed, 2);
204        assert_eq!(progress.processed_rows, 500);
205    }
206}