knrs/
lib.rs

1use std::time::SystemTime;
2use std::path::PathBuf;
3
4extern crate quick_csv;
5extern crate csv;
6extern crate glob;
7extern crate pretty_bytes;
8
9extern crate scoped_threadpool;
10use scoped_threadpool::Pool;
11
12pub type InputFile = quick_csv::Csv<std::io::BufReader<std::fs::File>>;
13pub type OutputFile = csv::Writer<std::fs::File>;
14
15pub trait KRNSMovementOperator {
16    type T: Sync;
17
18    // Required to be implemented by all implementing structs
19    fn summarize_one_file(&self, input: &mut InputFile, output: &mut OutputFile);
20    fn create_out(&self) -> Self::T;
21
22    // functions with default implementations
23    #[allow(unused_variables)]
24    fn aggregate_one_summary(&self, input: &mut InputFile, out: &mut Self::T) {}
25
26    #[allow(unused_variables)]
27    fn write_aggregate(&self, writer: &mut OutputFile, out: &Self::T) {}
28
29    fn summary_dir(&self) -> PathBuf {
30        let out = std::env::current_dir().unwrap();
31        out.join("summaries")
32    }
33
34    fn summary_extension(&self) -> &str {
35        "tsv"
36    }
37
38    fn summary_glob_pattern(&self) -> String {
39        let mut out = self.summary_dir();
40        out.push("*");
41        out.set_extension(self.summary_extension());
42        let pat = &out.to_str().expect("Couldn't construct summary globber");
43        pat.to_string()
44    }
45
46    fn summary_files(&self) -> Vec<PathBuf> {
47        self.glob_to_pathbufs(&self.summary_glob_pattern())
48    }
49
50    fn glob_to_pathbufs(&self, pat: &str) -> Vec<PathBuf> {
51        let mut out: Vec<_> = vec![];
52        let globbed = glob::glob(pat).unwrap();
53        for entry in globbed {
54            match entry {
55                Ok(x) => {
56                    if x != self.aggregate_filename() {
57                        out.push(x);
58                    }
59                }
60                _ => panic!("funky globber"),
61            }
62        }
63        out
64    }
65
66    fn aggregate_basename(&self) -> &str {
67        "aggregated"
68    }
69
70    fn aggregate_filename(&self) -> PathBuf {
71        let mut out = self.summary_dir();
72        out.push(self.aggregate_basename());
73        out.set_extension(self.summary_extension());
74        out
75    }
76
77    fn summarize_threads(&self) -> Option<u32> {
78        None
79    }
80
81    // Return path to sumary file, given current file.
82    fn summary_path(&self, p: &PathBuf) -> PathBuf {
83        let mut dir = self.summary_dir();
84        let p_name = p.file_name().expect("need a file name");
85        dir.push(p_name);
86        dir.set_extension(self.summary_extension());
87        dir
88    }
89
90
91    fn aggregate_summaries(&self) {
92        let summary_files = self.summary_files();
93        let mut out = self.create_out();
94        for p in &summary_files {
95            println!("  [knrs aggregate] input:\t{}", p.display());
96            let mut input = quick_csv::Csv::from_file(p)
97                .expect("Couldn't find input file")
98                .delimiter(b'\t')
99                .has_header(true);
100            self.aggregate_one_summary(&mut input, &mut out);
101        }
102
103        let out_fn = self.aggregate_filename();
104        println!("  [knrs aggregate] output:\t{}", out_fn.display());
105
106        let mut writer = csv::Writer::from_file(out_fn)
107            .expect("couldn't create csv for writing")
108            .delimiter(b'\t');
109
110        self.write_aggregate(&mut writer, &mut out);
111    }
112
113    fn summarize(&self, paths: &[PathBuf])
114        where Self: Sync
115    {
116        let start_time = SystemTime::now();
117        if let Some(threads) = self.summarize_threads() {
118            let mut pool = Pool::new(threads);
119            pool.scoped(|scoped| {
120                for p in paths {
121                    scoped.execute(move || {
122 	                let p2 = self.summary_path(&p);
123                        let (mut input, mut output) = pre_summarize(p, &p2);
124                        self.summarize_one_file(&mut input, &mut output);
125                    });
126                }
127            });
128        } else {
129            for p in paths {
130                let p2 = self.summary_path(&p);
131                let (mut input, mut output) = pre_summarize(p, &p2);
132                self.summarize_one_file(&mut input, &mut output);
133            }
134        }
135        println!(" [knrs] Total time {:?}", start_time.elapsed().unwrap());
136    }
137}
138
139pub fn pre_summarize(p: &PathBuf, p2: &PathBuf) -> (InputFile, OutputFile) {
140    let p_bytes = std::fs::metadata(&p).unwrap().len();
141    println!("  [knrs] input:\t{} ({})\n  [knrs] output:\t{}",
142             p.display(),
143             pretty_bytes::converter::convert(p_bytes as f64),
144             p2.display());
145
146    let p2_par = p2.parent().expect("p2 should have a parent");
147    if !p2_par.is_dir() {
148        if p2_par.exists() {
149            panic!("Output path exists, but isn't directory");
150        } else {
151            println!("  [knrs] creating output directory:\t{}", p2_par.display());
152            std::fs::create_dir_all(p2_par).unwrap_or_else(|why| {
153                println!("! {:?}", why.kind());
154            });
155        }
156    }
157
158    let input = quick_csv::Csv::from_file(p)
159        .expect("Couldn't find input file")
160        .delimiter(b'\t')
161        .has_header(true);
162
163    let output = csv::Writer::from_file(&p2)
164        .expect("couldn't create csv for writing")
165        .delimiter(b'\t');
166
167    return (input, output);
168}