tange_collection/
interfaces.rs1extern crate serde;
3extern crate bincode;
4extern crate uuid;
5extern crate snap;
6
7use std::any::Any;
8use std::fs::{File,remove_file,create_dir_all};
9use std::io::{BufReader,BufWriter};
10use std::marker::PhantomData;
11use std::sync::Arc;
12
13use self::snap::{Writer,Reader};
14use self::serde::{Serialize,Deserialize};
15use self::bincode::{serialize_into, deserialize_from,ErrorKind};
16use self::uuid::Uuid;
17
18pub trait Accumulator<A>: Send + Sync + Clone {
21
22 type VW: ValueWriter<A>;
24
25 fn writer(&self) -> Self::VW;
27
28 fn write_vec(&self, vs: Vec<A>) -> <<Self as Accumulator<A>>::VW as ValueWriter<A>>::Out {
30 let mut out = self.writer();
31 for a in vs {
32 out.add(a)
33 }
34 out.finish()
35 }
36}
37
38pub trait ValueWriter<A>: Sized {
41 type Out: Accumulator<A>;
43
44 fn add(&mut self, item: A) -> ();
46
47 fn extend<I: Iterator<Item=A>>(&mut self, i: &mut I) -> () {
49 for item in i {
50 self.add(item);
51 }
52 }
53
54 fn finish(self) -> Self::Out;
56}
57
58#[derive(Clone)]
60pub struct Memory;
61
62impl <A: Any + Send + Sync + Clone> Accumulator<A> for Memory {
63 type VW = Vec<A>;
64
65 fn writer(&self) -> Self::VW {
66 Vec::new()
67 }
68}
69
70impl <A: Any + Send + Sync + Clone> Accumulator<A> for Vec<A> {
71 type VW = Vec<A>;
72
73 fn writer(&self) -> Self::VW {
74 Vec::new()
75 }
76}
77
78impl <A: Any + Send + Sync + Clone> ValueWriter<A> for Vec<A> {
79 type Out = Vec<A>;
80
81 fn add(&mut self, item: A) -> () {
82 self.push(item);
83 }
84
85 fn finish(mut self) -> Self::Out {
86 self.shrink_to_fit();
87 self
88 }
89}
90
91pub trait Stream<A> {
93 type Iter: IntoIterator<Item=A>;
95
96 fn stream(&self) -> Self::Iter;
98
99 fn copy(&self) -> Self;
101}
102
103impl <A: Clone> Stream<A> for Vec<A> {
104 type Iter = Vec<A>;
105
106 fn stream(&self) -> Self::Iter {
107 self.clone()
108 }
109
110 fn copy(&self) -> Self {
111 self.clone()
112 }
113}
114
115#[derive(Clone)]
117pub struct Disk(pub Arc<String>);
118
119impl Disk {
120 pub fn from_str(s: &str) -> Self {
122 Disk(Arc::new(s.to_owned()))
123 }
124}
125
126pub struct DiskBuffer<A> {
128 root_path: Arc<String>,
129 name: String,
130 pd: PhantomData<A>,
131 out: Writer<BufWriter<File>>
132}
133
134impl <A> DiskBuffer<A> {
135 fn new(path: Arc<String>) -> Self {
136 let name = format!("{}/tange-{}", &path, Uuid::new_v4());
137 {
138 let p: &str = &path;
139 create_dir_all(p).expect("Unable to create directory!");
140 }
141 let fd = File::create(&name).expect("Can't create file!");
142 let bw = BufWriter::new(fd);
143 let encoder = Writer::new(bw);
144 DiskBuffer {
145 root_path: path,
146 name: name,
147 pd: PhantomData,
148 out: encoder
149 }
150 }
151}
152
153#[derive(Clone)]
155pub struct FileStore<A: Clone + Send + Sync> {
156 root_path: Arc<String>,
157 name: Option<String>,
158 pd: PhantomData<A>
159}
160
161impl <A: Clone + Send + Sync> FileStore<A> {
162
163 pub fn empty(path: Arc<String>) -> Self {
165 FileStore {
166 root_path: path,
167 name: None,
168 pd: PhantomData
169 }
170 }
171}
172
173impl <A: Clone + Send + Sync> Drop for FileStore<A> {
175 fn drop(&mut self) {
176 if let Some(ref name) = self.name {
177 if let Err(e) = remove_file(name) {
178 eprintln!("Error Deleting {}: {:?}J", name, e);
179 }
180 }
181 }
182}
183
184impl <A: Serialize + Clone + Send + Sync> Accumulator<A> for Disk {
185 type VW = DiskBuffer<A>;
186
187 fn writer(&self) -> Self::VW {
188 DiskBuffer::new(self.0.clone())
189 }
190}
191
192impl <A: Serialize + Clone + Send + Sync> Accumulator<A> for Arc<FileStore<A>> {
193 type VW = DiskBuffer<A>;
194
195 fn writer(&self) -> Self::VW {
196 DiskBuffer::new(self.root_path.clone())
197 }
198}
199
200impl <A: Serialize + Clone + Send + Sync> ValueWriter<A> for DiskBuffer<A> {
201 type Out = Arc<FileStore<A>>;
202
203 fn add(&mut self, item: A) -> () {
204 serialize_into(&mut self.out, &item).expect("Couldn't write record!");
205 }
206
207 fn finish(self) -> Self::Out {
208 Arc::new(FileStore {
209 root_path: self.root_path.clone(),
210 name: Some(self.name),
211 pd: PhantomData
212 })
213 }
214}
215
216
217impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> Stream<A> for Arc<FileStore<A>> {
218 type Iter = RecordFile<A>;
219
220 fn stream(&self) -> Self::Iter {
221 RecordFile(self.name.clone(), PhantomData)
222 }
223
224 fn copy(&self) -> Self { self.clone() }
225}
226
227pub struct RecordFile<A>(Option<String>, PhantomData<A>);
229
230impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> IntoIterator for RecordFile<A> {
231 type Item = A;
232 type IntoIter = RecordStreamer<A>;
233
234 fn into_iter(self) -> Self::IntoIter {
235 if let Some(ref n) = self.0 {
236 let fd = File::open(n).expect("File didn't exist on open!");
237 let brfd = BufReader::new(fd);
238 let decoder = Reader::new(brfd);
239 RecordStreamer(Some(decoder), PhantomData)
240 } else {
241 RecordStreamer(None, PhantomData)
242 }
243 }
244}
245
246pub struct RecordStreamer<A>(Option<Reader<BufReader<File>>>, PhantomData<A>);
248
249impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> Iterator for RecordStreamer<A> {
250 type Item = A;
251
252 fn next(&mut self) -> Option<Self::Item> {
253 if let Some(ref mut bw) = self.0 {
254 match deserialize_from(bw) {
256 Ok(record) => Some(record),
257 Err(e) => {
258 let ek: &ErrorKind = &e;
259 match ek {
260 &ErrorKind::DeserializeAnyNotSupported => {
261 eprintln!("Bincode doesn't work with certain types!");
262 panic!();
263 },
264 _ => None
265 }
266 }
267 }
268 } else {
269 None
270 }
271 }
272}