Skip to main content

tange_collection/
interfaces.rs

1//! Defines the internal collections traits and objects..
2extern crate serde;
3extern crate bincode;
4extern crate uuid;
5extern crate snap;
6
7use std::any::Any;
8use std::fs::{File,remove_file,create_dir_all};
9use std::io::{BufReader,BufWriter};
10use std::marker::PhantomData;
11use std::sync::Arc;
12
13use self::snap::{Writer,Reader};
14use self::serde::{Serialize,Deserialize};
15use self::bincode::{serialize_into, deserialize_from,ErrorKind};
16use self::uuid::Uuid;
17
18/// Accumulators are object which can create 'Writers', using effectively the Builder
19/// pattern
20pub trait Accumulator<A>: Send + Sync + Clone  {
21
22    /// ValueWriter created
23    type VW: ValueWriter<A>;
24    
25    /// Create a new ValueWriter
26    fn writer(&self) -> Self::VW;
27
28    /// Convert a Vec into a ValueWriter output
29    fn write_vec(&self, vs: Vec<A>) -> <<Self as Accumulator<A>>::VW as ValueWriter<A>>::Out {
30        let mut out = self.writer();
31        for a in vs {
32            out.add(a)
33        }
34        out.finish()
35    }
36}
37
38/// ValueWriters write Values into some internal state.  When finished, yields some
39/// construct that 'contains' the output.
40pub trait ValueWriter<A>: Sized {
41    /// Value Store
42    type Out: Accumulator<A>;
43
44    /// Add an element to the ValueWriter
45    fn add(&mut self, item: A) -> ();
46
47    /// Writes an iterator to the ValueWriter
48    fn extend<I: Iterator<Item=A>>(&mut self, i: &mut I) -> () {
49        for item in i {
50            self.add(item);
51        }
52    }
53
54    /// Close the ValueWriter, returning the store
55    fn finish(self) -> Self::Out;
56}
57
58/// Defines an Accumulator that writes values in memory, using Vec as the store.
59#[derive(Clone)]
60pub struct Memory;
61
62impl <A: Any + Send + Sync + Clone> Accumulator<A> for Memory {
63    type VW = Vec<A>;
64
65    fn writer(&self) -> Self::VW {
66        Vec::new()
67    }
68}
69
70impl <A: Any + Send + Sync + Clone> Accumulator<A> for Vec<A> {
71    type VW = Vec<A>;
72
73    fn writer(&self) -> Self::VW {
74        Vec::new()
75    }
76}
77
78impl <A: Any + Send + Sync + Clone> ValueWriter<A> for Vec<A> {
79    type Out = Vec<A>;
80
81    fn add(&mut self, item: A) -> () {
82        self.push(item);
83    }
84
85    fn finish(mut self) -> Self::Out {
86        self.shrink_to_fit();
87        self
88    }
89}
90
91/// Uniform API for reading Values from a Store
92pub trait Stream<A> {
93    /// Iterator, yielding owned value
94    type Iter: IntoIterator<Item=A>;
95
96    /// Returns an iterator with owned values.
97    fn stream(&self) -> Self::Iter;
98
99    /// Returns a copy of the store.
100    fn copy(&self) -> Self;
101}
102
103impl <A: Clone> Stream<A> for Vec<A> {
104    type Iter = Vec<A>;
105
106    fn stream(&self) -> Self::Iter {
107        self.clone()
108    }
109
110    fn copy(&self) -> Self {
111        self.clone()
112    }
113}
114
115/// Writes values to a directory
116#[derive(Clone)]
117pub struct Disk(pub Arc<String>);
118
119impl Disk {
120    /// Creates a new Disk object from a path
121    pub fn from_str(s: &str) -> Self {
122        Disk(Arc::new(s.to_owned()))
123    }
124}
125
126/// An open buffer for writing records to disk
127pub struct DiskBuffer<A> {
128    root_path: Arc<String>, 
129    name: String,
130    pd: PhantomData<A>,
131    out: Writer<BufWriter<File>>
132}
133
134impl <A> DiskBuffer<A> {
135    fn new(path: Arc<String>) -> Self {
136        let name = format!("{}/tange-{}", &path, Uuid::new_v4());
137        {
138            let p: &str = &path;
139            create_dir_all(p).expect("Unable to create directory!");
140        }
141        let fd = File::create(&name).expect("Can't create file!");
142        let bw = BufWriter::new(fd);
143        let encoder = Writer::new(bw);
144        DiskBuffer { 
145            root_path: path, 
146            name: name, 
147            pd: PhantomData,
148            out: encoder
149        }
150    }
151}
152
153/// Contains a root path for storing temporary files
154#[derive(Clone)]
155pub struct FileStore<A: Clone + Send + Sync> {
156    root_path: Arc<String>, 
157    name: Option<String>,
158    pd: PhantomData<A>
159}
160
161impl <A: Clone + Send + Sync> FileStore<A> {
162
163    /// Create an empty FileStore at the given path
164    pub fn empty(path: Arc<String>) -> Self {
165        FileStore {
166            root_path: path,
167            name: None,
168            pd: PhantomData
169        }
170    }
171}
172
173// Delete the temporary file on disk when dropped
174impl <A: Clone + Send + Sync> Drop for FileStore<A> {
175    fn drop(&mut self) {
176        if let Some(ref name) = self.name {
177            if let Err(e) = remove_file(name) {
178                eprintln!("Error Deleting {}: {:?}J", name, e);
179            }
180        }
181    }
182}
183
184impl <A: Serialize + Clone + Send + Sync> Accumulator<A> for Disk {
185    type VW = DiskBuffer<A>;
186
187    fn writer(&self) -> Self::VW {
188        DiskBuffer::new(self.0.clone())
189    }
190}
191
192impl <A: Serialize + Clone + Send + Sync> Accumulator<A> for Arc<FileStore<A>> {
193    type VW = DiskBuffer<A>;
194
195    fn writer(&self) -> Self::VW {
196        DiskBuffer::new(self.root_path.clone())
197    }
198}
199
200impl <A: Serialize + Clone + Send + Sync> ValueWriter<A> for DiskBuffer<A> {
201    type Out = Arc<FileStore<A>>;
202
203    fn add(&mut self, item: A) -> () {
204        serialize_into(&mut self.out, &item).expect("Couldn't write record!");
205    }
206
207    fn finish(self) -> Self::Out {
208        Arc::new(FileStore { 
209            root_path: self.root_path.clone(), 
210            name: Some(self.name), 
211            pd: PhantomData
212        })
213    }
214}
215
216
217impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> Stream<A> for Arc<FileStore<A>> {
218    type Iter = RecordFile<A>;
219
220    fn stream(&self) -> Self::Iter {
221        RecordFile(self.name.clone(), PhantomData)
222    }
223
224    fn copy(&self) -> Self { self.clone() }
225}
226
227/// Streams records from an optional File.  If the file is none, returns the Empty iterator
228pub struct RecordFile<A>(Option<String>, PhantomData<A>);
229
230impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> IntoIterator for RecordFile<A> {
231    type Item = A;
232    type IntoIter = RecordStreamer<A>;
233
234    fn into_iter(self) -> Self::IntoIter {
235        if let Some(ref n) = self.0 {
236            let fd = File::open(n).expect("File didn't exist on open!");
237            let brfd = BufReader::new(fd);
238            let decoder = Reader::new(brfd);
239            RecordStreamer(Some(decoder), PhantomData)
240        } else {
241            RecordStreamer(None, PhantomData)
242        }
243    }
244}
245
246/// Stream Records from an open file
247pub struct RecordStreamer<A>(Option<Reader<BufReader<File>>>, PhantomData<A>);
248
249impl <A: Clone + Send + Sync + for<'de> Deserialize<'de>> Iterator for RecordStreamer<A> {
250    type Item = A;
251
252    fn next(&mut self) -> Option<Self::Item> {
253        if let Some(ref mut bw) = self.0 {
254            //deserialize_from(bw).expect("Failure on deserialization!")
255            match deserialize_from(bw) {
256                Ok(record) => Some(record),
257                Err(e) => {
258                    let ek: &ErrorKind = &e;
259                    match ek {
260                        &ErrorKind::DeserializeAnyNotSupported => {
261                            eprintln!("Bincode doesn't work with certain types!");
262                            panic!();
263                        },
264                        _ => None
265                    }
266                }
267            }
268        } else {
269            None
270        }
271    }
272}