1use std::io;
2use std::iter;
3use std::path::Path;
4
5use csv;
6use memmap::Mmap;
7
8use crate::error::{Error, Result};
9use crate::index::{csv_file, csv_mmap, id};
10use crate::record::AKA;
11use crate::util::IMDB_AKAS;
12
13const AKAS: &str = "akas.fst";
20
21#[derive(Debug)]
27pub struct Index {
28 akas: csv::Reader<io::Cursor<Mmap>>,
29 idx: id::IndexReader,
30}
31
32impl Index {
33 pub fn open<P1: AsRef<Path>, P2: AsRef<Path>>(
37 data_dir: P1,
38 index_dir: P2,
39 ) -> Result<Index> {
40 Ok(Index {
41 akas: unsafe { csv_mmap(data_dir.as_ref().join(IMDB_AKAS))? },
44 idx: id::IndexReader::from_path(index_dir.as_ref().join(AKAS))?,
45 })
46 }
47
48 pub fn create<P1: AsRef<Path>, P2: AsRef<Path>>(
51 data_dir: P1,
52 index_dir: P2,
53 ) -> Result<Index> {
54 let data_dir = data_dir.as_ref();
55 let index_dir = index_dir.as_ref();
56
57 let rdr = csv_file(data_dir.join(IMDB_AKAS))?;
58 let mut wtr = id::IndexSortedWriter::from_path(index_dir.join(AKAS))?;
59 let mut count = 0u64;
60 for result in AKAIndexRecords::new(rdr) {
61 let record = result?;
62 wtr.insert(&record.id, (record.count << 48) | record.offset)?;
63 count += record.count;
64 }
65 wtr.finish()?;
66
67 log::info!("{} alternate names indexed", count);
68 Index::open(data_dir, index_dir)
69 }
70
71 pub fn find(&mut self, id: &[u8]) -> Result<AKARecordIter> {
74 match self.idx.get(id) {
75 None => Ok(AKARecordIter(None)),
76 Some(v) => {
77 let count = (v >> 48) as usize;
78 let offset = v & ((1 << 48) - 1);
79
80 let mut pos = csv::Position::new();
81 pos.set_byte(offset);
82 self.akas.seek(pos).map_err(Error::csv)?;
83
84 Ok(AKARecordIter(Some(self.akas.deserialize().take(count))))
85 }
86 }
87 }
88}
89
90pub struct AKARecordIter<'r>(
99 Option<iter::Take<csv::DeserializeRecordsIter<'r, io::Cursor<Mmap>, AKA>>>,
100);
101
102impl<'r> Iterator for AKARecordIter<'r> {
103 type Item = Result<AKA>;
104
105 fn next(&mut self) -> Option<Result<AKA>> {
106 let next = match self.0.as_mut().and_then(|it| it.next()) {
107 None => return None,
108 Some(next) => next,
109 };
110 match next {
111 Ok(next) => Some(Ok(next)),
112 Err(err) => Some(Err(Error::csv(err))),
113 }
114 }
115}
116
117#[derive(Clone, Debug, Eq, PartialEq)]
122struct AKAIndexRecord {
123 id: Vec<u8>,
124 offset: u64,
125 count: u64,
126}
127
128#[derive(Debug)]
137struct AKAIndexRecords<R> {
138 rdr: csv::Reader<R>,
140 record: csv::ByteRecord,
142 done: bool,
144}
145
146impl<R: io::Read> AKAIndexRecords<R> {
147 fn new(rdr: csv::Reader<R>) -> AKAIndexRecords<R> {
149 AKAIndexRecords { rdr, record: csv::ByteRecord::new(), done: false }
150 }
151}
152
153impl<R: io::Read> Iterator for AKAIndexRecords<R> {
154 type Item = Result<AKAIndexRecord>;
155
156 fn next(&mut self) -> Option<Result<AKAIndexRecord>> {
162 macro_rules! itry {
163 ($e:expr) => {
164 match $e {
165 Err(err) => return Some(Err(Error::csv(err))),
166 Ok(v) => v,
167 }
168 };
169 }
170
171 if self.done {
172 return None;
173 }
174 if self.record.is_empty() {
177 if !itry!(self.rdr.read_byte_record(&mut self.record)) {
178 return None;
179 }
180 }
181 let mut irecord = AKAIndexRecord {
182 id: self.record[0].to_vec(),
183 offset: self.record.position().expect("position on row").byte(),
184 count: 1,
185 };
186 while itry!(self.rdr.read_byte_record(&mut self.record)) {
187 if irecord.id != &self.record[0] {
188 break;
189 }
190 irecord.count += 1;
191 }
192 if self.rdr.is_done() {
194 self.done = true;
195 }
196 Some(Ok(irecord))
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::util::csv_reader_builder;
204
205 #[test]
206 fn aka_index_records1() {
207 let data = r"titleId ordering title region language types attributes isOriginalTitle
208tt0117019 1 Hommes à l'huile FR \N \N \N 0
209tt0117019 2 Männer in Öl DE \N \N \N 0
210tt0117019 3 Men in Oil XEU en festival \N 0
211tt0117019 4 Männer in Öl: Annäherungsversuche an die Malerin Susanne Hay \N \N original \N 1
212tt0117019 5 Men in Oil XWW en \N \N 0
213tt0117020 1 Mendigos sin fronteras ES \N \N \N 0
214tt0117021 1 Menno's Mind US \N \N \N 0
215tt0117021 2 Menno's Mind \N \N original \N 1
216tt0117021 3 The Matrix 2 RU \N video \N 0
217tt0117021 4 Virtuális elme HU \N imdbDisplay \N 0
218tt0117021 5 Power.com US \N video \N 0
219tt0117021 6 La mente de Menno ES \N \N \N 0
220tt0117021 7 Power.com CA en video \N 0
221tt0117021 8 Terror im Computer DE \N \N \N 0
222tt0117022 1 Menopause Song CA \N \N \N 0
223tt0117023 1 Les menteurs FR \N \N \N 0";
224 let rdr = csv_reader_builder().from_reader(data.as_bytes());
225 let records: Vec<AKAIndexRecord> =
226 AKAIndexRecords::new(rdr).collect::<Result<_>>().unwrap();
227 assert_eq!(records.len(), 5);
228
229 assert_eq!(records[0].id, b"tt0117019");
230 assert_eq!(records[0].count, 5);
231
232 assert_eq!(records[1].id, b"tt0117020");
233 assert_eq!(records[1].count, 1);
234
235 assert_eq!(records[2].id, b"tt0117021");
236 assert_eq!(records[2].count, 8);
237
238 assert_eq!(records[3].id, b"tt0117022");
239 assert_eq!(records[3].count, 1);
240
241 assert_eq!(records[4].id, b"tt0117023");
242 assert_eq!(records[4].count, 1);
243 }
244
245 #[test]
246 fn aka_index_records2() {
247 let data = r"titleId ordering title region language types attributes isOriginalTitle
248tt0117019 1 Hommes à l'huile FR \N \N \N 0
249tt0117019 2 Männer in Öl DE \N \N \N 0
250tt0117019 3 Men in Oil XEU en festival \N 0
251tt0117019 4 Männer in Öl: Annäherungsversuche an die Malerin Susanne Hay \N \N original \N 1
252tt0117019 5 Men in Oil XWW en \N \N 0
253tt0117020 1 Mendigos sin fronteras ES \N \N \N 0
254tt0117021 1 Menno's Mind US \N \N \N 0
255tt0117021 2 Menno's Mind \N \N original \N 1
256tt0117021 3 The Matrix 2 RU \N video \N 0
257tt0117021 4 Virtuális elme HU \N imdbDisplay \N 0
258tt0117021 5 Power.com US \N video \N 0
259tt0117021 6 La mente de Menno ES \N \N \N 0
260tt0117021 7 Power.com CA en video \N 0
261tt0117021 8 Terror im Computer DE \N \N \N 0";
262 let rdr = csv_reader_builder().from_reader(data.as_bytes());
263 let records: Vec<AKAIndexRecord> =
264 AKAIndexRecords::new(rdr).collect::<Result<_>>().unwrap();
265 assert_eq!(records.len(), 3);
266
267 assert_eq!(records[0].id, b"tt0117019");
268 assert_eq!(records[0].count, 5);
269
270 assert_eq!(records[1].id, b"tt0117020");
271 assert_eq!(records[1].count, 1);
272
273 assert_eq!(records[2].id, b"tt0117021");
274 assert_eq!(records[2].count, 8);
275 }
276
277 #[test]
278 fn aka_index_records3() {
279 let data = r"titleId ordering title region language types attributes isOriginalTitle
280tt0117021 1 Menno's Mind US \N \N \N 0
281tt0117021 2 Menno's Mind \N \N original \N 1
282tt0117021 3 The Matrix 2 RU \N video \N 0
283tt0117021 4 Virtuális elme HU \N imdbDisplay \N 0
284tt0117021 5 Power.com US \N video \N 0
285tt0117021 6 La mente de Menno ES \N \N \N 0
286tt0117021 7 Power.com CA en video \N 0
287tt0117021 8 Terror im Computer DE \N \N \N 0";
288 let rdr = csv_reader_builder().from_reader(data.as_bytes());
289 let records: Vec<AKAIndexRecord> =
290 AKAIndexRecords::new(rdr).collect::<Result<_>>().unwrap();
291 assert_eq!(records.len(), 1);
292
293 assert_eq!(records[0].id, b"tt0117021");
294 assert_eq!(records[0].count, 8);
295 }
296
297 #[test]
298 fn aka_index_records4() {
299 let data = r"titleId ordering title region language types attributes isOriginalTitle
300tt0117021 1 Menno's Mind US \N \N \N 0";
301 let rdr = csv_reader_builder().from_reader(data.as_bytes());
302 let records: Vec<AKAIndexRecord> =
303 AKAIndexRecords::new(rdr).collect::<Result<_>>().unwrap();
304 assert_eq!(records.len(), 1);
305
306 assert_eq!(records[0].id, b"tt0117021");
307 assert_eq!(records[0].count, 1);
308 }
309}