closure_core/
lib.rs

1//! CLOSURE: complete listing of original samples of underlying raw evidence
2//! 
3//! Crate closure-core implements the CLOSURE technique for efficiently reconstructing
4//! all possible distributions of raw data from summary statistics. It is not
5//! about the Rust feature called closure.
6//! 
7//! The only API users are likely to need is `dfs_parallel()`. This function applies
8//! the lower-level `dfs_branch()` in parallel and writes results to disk (currently
9//! into a CSV file, but this may change in the future.)
10//! 
11//! Most of the code was written by Claude 3.5, translating Python code by Nathanael Larigaldie.
12
13
14use std::collections::VecDeque;
15use std::fs::{File, OpenOptions};
16use std::io;
17use std::sync::{Arc, Mutex};
18use std::time::Instant;
19use csv::WriterBuilder;
20use rayon::prelude::*;
21use indicatif::{ProgressBar, ProgressStyle};
22
23
24/// Struct to hold combinations of possible raw data
25/// 
26/// Each row in the tabular structure written to disk is collected from `values`.
27/// The other two fields are only used at runtime.
28#[derive(Clone)]
29struct Combination {
30    values: Vec<i32>,
31    running_sum: f64,
32    running_m2: f64,
33}
34
35
36/// Calculates the number of initial combinations that will be processed in parallel
37fn count_initial_combinations(min_scale: i32, max_scale: i32) -> i32 {
38    // Each combination starts with two numbers i,j where min_scale ≤ i ≤ j ≤ max_scale
39    // This is equivalent to choosing 2 numbers with replacement where order doesn't matter
40    // The formula is: (n+1) * n / 2 where n is the range size
41    let range_size = max_scale - min_scale + 1;
42    (range_size * (range_size + 1)) / 2
43}
44
45
46/// Collect all valid combinations from a starting point
47pub fn dfs_branch(
48    start_combination: Vec<i32>,
49    running_sum_init: f64,
50    running_m2_init: f64,
51    n: usize,
52    target_sum_upper: f64,
53    target_sum_lower: f64,
54    target_sd_upper: f64,
55    target_sd_lower: f64,
56    min_scale_sum: &[i32],
57    max_scale_sum: &[i32],
58    n_minus_1: usize,
59    max_scale_plus_1: i32,
60) -> Vec<Vec<i32>> {
61    let mut stack = VecDeque::new();
62    let mut results = Vec::new();
63    
64    // Initialize stack with starting combination
65    stack.push_back(Combination {
66        values: start_combination,
67        running_sum: running_sum_init,
68        running_m2: running_m2_init,
69    });
70    
71    while let Some(current) = stack.pop_back() {
72        // Check if we've reached desired length
73        if current.values.len() >= n {
74            let current_std = (current.running_m2 / n_minus_1 as f64).sqrt();
75            if current_std >= target_sd_lower {
76                results.push(current.values);
77            }
78            continue;
79        }
80
81        // Calculate remaining positions to fill
82        let n_left = n_minus_1 - current.values.len();
83        let next_n = current.values.len() + 1;
84        let last_value = *current.values.last().unwrap();
85
86        // Try each possible next value
87        for next_value in last_value..max_scale_plus_1 {
88            // Early pruning based on mean bounds
89            let next_sum = current.running_sum + next_value as f64;
90            let minmean = next_sum + min_scale_sum[n_left] as f64;
91            if minmean > target_sum_upper {
92                break; // No need to try larger values
93            }
94            
95            let maxmean = next_sum + max_scale_sum[n_left] as f64;
96            if maxmean < target_sum_lower {
97                continue;
98            }
99
100            // Calculate standard deviation metrics
101            let next_mean = next_sum / next_n as f64;
102            let delta = next_value as f64 - current.running_sum / current.values.len() as f64;
103            let delta2 = next_value as f64 - next_mean;
104            let next_m2 = current.running_m2 + delta * delta2;
105            
106            // Early pruning based on standard deviation
107            let min_sd = (next_m2 / n_minus_1 as f64).sqrt();
108            if min_sd > target_sd_upper {
109                continue;
110            }
111
112            // Add valid combination to stack
113            let mut new_values = current.values.clone();
114            new_values.push(next_value);
115            stack.push_back(Combination {
116                values: new_values,
117                running_sum: next_sum,
118                running_m2: next_m2,
119            });
120        }
121    }
122
123    results
124}
125
126
127/// Run CLOSURE across starting combinations and write results to disk
128pub fn dfs_parallel(
129    min_scale: i32,
130    max_scale: i32,
131    n: usize,
132    target_sum: f64,
133    target_sd: f64,
134    rounding_error_sums: f64,
135    rounding_error_sds: f64,
136    output_file: &str,
137) -> io::Result<()> {
138    let start_time = Instant::now();
139    
140    // Calculate and print the number of initial parallel tasks
141    let initial_count = count_initial_combinations(min_scale, max_scale);
142    println!("Number of initial combinations to process: {}", initial_count);
143    
144    // Calculate bounds for target metrics
145    let target_sum_upper = target_sum + rounding_error_sums;
146    let target_sum_lower = target_sum - rounding_error_sums;
147    let target_sd_upper = target_sd + rounding_error_sds;
148    let target_sd_lower = target_sd - rounding_error_sds;
149    
150    // Precompute scale sums for optimization
151    let min_scale_sum: Vec<i32> = (0..n)
152        .map(|x| min_scale * x as i32)
153        .collect();
154    let max_scale_sum: Vec<i32> = (0..n)
155        .map(|x| max_scale * x as i32)
156        .collect();
157    
158    let n_minus_1 = n - 1;
159    let max_scale_plus_1 = max_scale + 1;
160
161    // Generate initial combinations for parallel processing
162    let mut initial_combinations = Vec::new();
163    for i in min_scale..=max_scale {
164        for j in i..=max_scale {
165            let initial_combination = vec![i, j];
166            let running_sum = (i + j) as f64;
167            let current_mean = running_sum / 2.0;
168            let current_m2 = (i as f64 - current_mean).powi(2) + 
169                            (j as f64 - current_mean).powi(2);
170            initial_combinations.push((initial_combination, running_sum, current_m2));
171        }
172    }
173
174    // Initialize CSV file with headers
175    let file = File::create(output_file)?;
176    let mut writer = WriterBuilder::new()
177        .has_headers(true)
178        .from_writer(file);
179
180    // Write header row
181    let header: Vec<String> = (1..=n)
182        .map(|i| format!("n{}", i))
183        .collect();
184    writer.write_record(&header)?;
185    writer.flush()?;
186
187    // Initialize progress bar
188    let bar = ProgressBar::new(initial_combinations.len() as u64);
189    bar.set_style(
190        ProgressStyle::default_bar()
191            .template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}")
192            .unwrap()
193            .progress_chars("=>-")
194    );
195
196    // Setup shared writer for parallel processing
197    let writer = Arc::new(Mutex::new(
198        WriterBuilder::new()
199            .has_headers(false)
200            .from_writer(
201                OpenOptions::new()
202                    .write(true)
203                    .append(true)
204                    .open(output_file)?
205            )
206    ));
207
208    // Process combinations in parallel
209    initial_combinations
210        .par_iter()
211        .for_each(|(combo, running_sum, running_m2)| {
212            let results = dfs_branch(
213                combo.clone(),
214                *running_sum,
215                *running_m2,
216                n,
217                target_sum_upper,
218                target_sum_lower,
219                target_sd_upper,
220                target_sd_lower,
221                &min_scale_sum,
222                &max_scale_sum,
223                n_minus_1,
224                max_scale_plus_1,
225            );
226
227            // Write all results from this branch at once
228            if !results.is_empty() {
229                let mut writer = writer.lock().unwrap();
230                for result in results {
231                    writer
232                        .write_record(
233                            &result
234                                .iter()
235                                .map(|x| x.to_string())
236                                .collect::<Vec<String>>()
237                        )
238                        .unwrap();
239                }
240                writer.flush().unwrap();
241            }
242            bar.inc(1);
243    });
244
245    bar.finish_with_message("Done!");
246
247    // Print execution time
248    let duration = start_time.elapsed();
249    println!("Execution time: {:.2} seconds", duration.as_secs_f64());
250
251    // Count and print total valid combinations
252    let file = File::open(output_file)?;
253    let reader = csv::ReaderBuilder::new()
254        .has_headers(true)
255        .from_reader(file);
256    let count = reader.into_records().count();
257    println!("Number of valid combinations: {}", count);
258
259    Ok(())
260}
261
262// fn main() -> io::Result<()> {
263//     let min_scale = 1;
264//     let max_scale = 7;
265//     let n = 30;
266//     let target_mean = 5.0;
267//     let target_sum = target_mean * n as f64;
268//     let target_sd = 2.78;
269//     let rounding_error_means = 0.01;
270//     let rounding_error_sums = rounding_error_means * n as f64;
271//     let rounding_error_sds = 0.01;
272//     let output_file = "parallel_results.csv";
273// 
274//     dfs_parallel(
275//         min_scale,
276//         max_scale,
277//         n,
278//         target_sum,
279//         target_sd,
280//         rounding_error_sums,
281//         rounding_error_sds,
282//         output_file,
283//     )
284// }