1use crate::DataProfilerError;
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
8pub struct StreamingConfig {
9 pub batch_size: usize,
10 pub max_memory_mb: usize,
11 pub progress_callback: Option<fn(u64, u64)>,
12}
13
14impl Default for StreamingConfig {
15 fn default() -> Self {
16 Self {
17 batch_size: 10000,
18 max_memory_mb: 512,
19 progress_callback: None,
20 }
21 }
22}
23
24pub fn merge_column_batches(
26 batches: Vec<HashMap<String, Vec<String>>>,
27) -> Result<HashMap<String, Vec<String>>, DataProfilerError> {
28 if batches.is_empty() {
29 return Ok(HashMap::new());
30 }
31
32 let mut merged: HashMap<String, Vec<String>> = HashMap::new();
33
34 for batch in batches {
35 for (column_name, column_data) in batch {
36 merged.entry(column_name).or_default().extend(column_data);
37 }
38 }
39
40 Ok(merged)
41}
42
43pub fn estimate_memory_usage(columns: &HashMap<String, Vec<String>>) -> usize {
45 columns
46 .iter()
47 .map(|(name, data)| name.len() + data.iter().map(|s| s.len()).sum::<usize>())
48 .sum::<usize>()
49}
50
51pub fn apply_sampling_if_needed(
53 mut columns: HashMap<String, Vec<String>>,
54 max_memory_mb: usize,
55 sampling_ratio: f64,
56) -> Result<(HashMap<String, Vec<String>>, bool), DataProfilerError> {
57 let memory_usage_bytes = estimate_memory_usage(&columns);
58 let memory_usage_mb = memory_usage_bytes / 1_048_576;
59
60 if memory_usage_mb <= max_memory_mb {
61 return Ok((columns, false));
62 }
63
64 let total_rows = columns.values().next().map(|v| v.len()).unwrap_or(0);
65 let target_rows = (total_rows as f64 * sampling_ratio) as usize;
66
67 if target_rows == 0 {
68 return Ok((HashMap::new(), true));
69 }
70
71 let step = total_rows / target_rows;
72 if step <= 1 {
73 return Ok((columns, false));
74 }
75
76 for column_data in columns.values_mut() {
77 let sampled: Vec<String> = column_data
78 .iter()
79 .step_by(step)
80 .take(target_rows)
81 .cloned()
82 .collect();
83 *column_data = sampled;
84 }
85
86 Ok((columns, true))
87}
88
89pub struct StreamingProgress {
91 pub total_rows: Option<u64>,
92 pub processed_rows: u64,
93 pub batches_processed: u64,
94 pub start_time: std::time::Instant,
95}
96
97impl StreamingProgress {
98 pub fn new(total_rows: Option<u64>) -> Self {
99 Self {
100 total_rows,
101 processed_rows: 0,
102 batches_processed: 0,
103 start_time: std::time::Instant::now(),
104 }
105 }
106
107 pub fn update(&mut self, batch_size: u64) {
108 self.processed_rows += batch_size;
109 self.batches_processed += 1;
110 }
111
112 pub fn percentage(&self) -> Option<f64> {
113 self.total_rows.map(|total| {
114 if total == 0 {
115 100.0
116 } else {
117 (self.processed_rows as f64 / total as f64) * 100.0
118 }
119 })
120 }
121
122 pub fn elapsed(&self) -> std::time::Duration {
123 self.start_time.elapsed()
124 }
125
126 pub fn estimated_total_time(&self) -> Option<std::time::Duration> {
127 if let Some(percentage) = self.percentage()
128 && percentage > 0.0
129 {
130 let elapsed_secs = self.elapsed().as_secs_f64();
131 let total_secs = elapsed_secs * (100.0 / percentage);
132 return Some(std::time::Duration::from_secs_f64(total_secs));
133 }
134 None
135 }
136
137 pub fn rows_per_second(&self) -> f64 {
138 let elapsed_secs = self.elapsed().as_secs_f64();
139 if elapsed_secs > 0.0 {
140 self.processed_rows as f64 / elapsed_secs
141 } else {
142 0.0
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_merge_column_batches() {
153 let batch1 = {
154 let mut map = HashMap::new();
155 map.insert("col1".to_string(), vec!["a".to_string(), "b".to_string()]);
156 map.insert("col2".to_string(), vec!["1".to_string(), "2".to_string()]);
157 map
158 };
159
160 let batch2 = {
161 let mut map = HashMap::new();
162 map.insert("col1".to_string(), vec!["c".to_string(), "d".to_string()]);
163 map.insert("col2".to_string(), vec!["3".to_string(), "4".to_string()]);
164 map
165 };
166
167 let merged = merge_column_batches(vec![batch1, batch2]).expect("Failed to merge batches");
168
169 assert_eq!(
170 merged.get("col1").expect("col1 not found"),
171 &vec!["a", "b", "c", "d"]
172 );
173 assert_eq!(
174 merged.get("col2").expect("col2 not found"),
175 &vec!["1", "2", "3", "4"]
176 );
177 }
178
179 #[test]
180 fn test_memory_estimation() {
181 let mut columns = HashMap::new();
182 columns.insert(
183 "test".to_string(),
184 vec!["hello".to_string(), "world".to_string()],
185 );
186
187 let memory = estimate_memory_usage(&columns);
188 assert_eq!(memory, 14);
189 }
190
191 #[test]
192 fn test_streaming_progress() {
193 let mut progress = StreamingProgress::new(Some(1000));
194
195 assert_eq!(progress.percentage(), Some(0.0));
196
197 progress.update(250);
198 assert_eq!(progress.percentage(), Some(25.0));
199
200 progress.update(250);
201 assert_eq!(progress.percentage(), Some(50.0));
202
203 assert_eq!(progress.batches_processed, 2);
204 assert_eq!(progress.processed_rows, 500);
205 }
206}