1use std::time::SystemTime;
2use std::path::PathBuf;
3
4extern crate quick_csv;
5extern crate csv;
6extern crate glob;
7extern crate pretty_bytes;
8
9extern crate scoped_threadpool;
10use scoped_threadpool::Pool;
11
12pub type InputFile = quick_csv::Csv<std::io::BufReader<std::fs::File>>;
13pub type OutputFile = csv::Writer<std::fs::File>;
14
15pub trait KRNSMovementOperator {
16 type T: Sync;
17
18 fn summarize_one_file(&self, input: &mut InputFile, output: &mut OutputFile);
20 fn create_out(&self) -> Self::T;
21
22 #[allow(unused_variables)]
24 fn aggregate_one_summary(&self, input: &mut InputFile, out: &mut Self::T) {}
25
26 #[allow(unused_variables)]
27 fn write_aggregate(&self, writer: &mut OutputFile, out: &Self::T) {}
28
29 fn summary_dir(&self) -> PathBuf {
30 let out = std::env::current_dir().unwrap();
31 out.join("summaries")
32 }
33
34 fn summary_extension(&self) -> &str {
35 "tsv"
36 }
37
38 fn summary_glob_pattern(&self) -> String {
39 let mut out = self.summary_dir();
40 out.push("*");
41 out.set_extension(self.summary_extension());
42 let pat = &out.to_str().expect("Couldn't construct summary globber");
43 pat.to_string()
44 }
45
46 fn summary_files(&self) -> Vec<PathBuf> {
47 self.glob_to_pathbufs(&self.summary_glob_pattern())
48 }
49
50 fn glob_to_pathbufs(&self, pat: &str) -> Vec<PathBuf> {
51 let mut out: Vec<_> = vec![];
52 let globbed = glob::glob(pat).unwrap();
53 for entry in globbed {
54 match entry {
55 Ok(x) => {
56 if x != self.aggregate_filename() {
57 out.push(x);
58 }
59 }
60 _ => panic!("funky globber"),
61 }
62 }
63 out
64 }
65
66 fn aggregate_basename(&self) -> &str {
67 "aggregated"
68 }
69
70 fn aggregate_filename(&self) -> PathBuf {
71 let mut out = self.summary_dir();
72 out.push(self.aggregate_basename());
73 out.set_extension(self.summary_extension());
74 out
75 }
76
77 fn summarize_threads(&self) -> Option<u32> {
78 None
79 }
80
81 fn summary_path(&self, p: &PathBuf) -> PathBuf {
83 let mut dir = self.summary_dir();
84 let p_name = p.file_name().expect("need a file name");
85 dir.push(p_name);
86 dir.set_extension(self.summary_extension());
87 dir
88 }
89
90
91 fn aggregate_summaries(&self) {
92 let summary_files = self.summary_files();
93 let mut out = self.create_out();
94 for p in &summary_files {
95 println!(" [knrs aggregate] input:\t{}", p.display());
96 let mut input = quick_csv::Csv::from_file(p)
97 .expect("Couldn't find input file")
98 .delimiter(b'\t')
99 .has_header(true);
100 self.aggregate_one_summary(&mut input, &mut out);
101 }
102
103 let out_fn = self.aggregate_filename();
104 println!(" [knrs aggregate] output:\t{}", out_fn.display());
105
106 let mut writer = csv::Writer::from_file(out_fn)
107 .expect("couldn't create csv for writing")
108 .delimiter(b'\t');
109
110 self.write_aggregate(&mut writer, &mut out);
111 }
112
113 fn summarize(&self, paths: &[PathBuf])
114 where Self: Sync
115 {
116 let start_time = SystemTime::now();
117 if let Some(threads) = self.summarize_threads() {
118 let mut pool = Pool::new(threads);
119 pool.scoped(|scoped| {
120 for p in paths {
121 scoped.execute(move || {
122 let p2 = self.summary_path(&p);
123 let (mut input, mut output) = pre_summarize(p, &p2);
124 self.summarize_one_file(&mut input, &mut output);
125 });
126 }
127 });
128 } else {
129 for p in paths {
130 let p2 = self.summary_path(&p);
131 let (mut input, mut output) = pre_summarize(p, &p2);
132 self.summarize_one_file(&mut input, &mut output);
133 }
134 }
135 println!(" [knrs] Total time {:?}", start_time.elapsed().unwrap());
136 }
137}
138
139pub fn pre_summarize(p: &PathBuf, p2: &PathBuf) -> (InputFile, OutputFile) {
140 let p_bytes = std::fs::metadata(&p).unwrap().len();
141 println!(" [knrs] input:\t{} ({})\n [knrs] output:\t{}",
142 p.display(),
143 pretty_bytes::converter::convert(p_bytes as f64),
144 p2.display());
145
146 let p2_par = p2.parent().expect("p2 should have a parent");
147 if !p2_par.is_dir() {
148 if p2_par.exists() {
149 panic!("Output path exists, but isn't directory");
150 } else {
151 println!(" [knrs] creating output directory:\t{}", p2_par.display());
152 std::fs::create_dir_all(p2_par).unwrap_or_else(|why| {
153 println!("! {:?}", why.kind());
154 });
155 }
156 }
157
158 let input = quick_csv::Csv::from_file(p)
159 .expect("Couldn't find input file")
160 .delimiter(b'\t')
161 .has_header(true);
162
163 let output = csv::Writer::from_file(&p2)
164 .expect("couldn't create csv for writing")
165 .delimiter(b'\t');
166
167 return (input, output);
168}