use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Instant;
use csv;
use memmap::Mmap;
use serde::{Deserialize, Serialize};
use serde_json;
use crate::error::{Error, Result};
use crate::record::{Episode, Rating, Title, TitleKind};
use crate::scored::SearchResults;
use crate::util::{
create_file, csv_file, csv_mmap, open_file, NiceDuration, IMDB_BASICS,
};
pub use self::aka::AKARecordIter;
pub use self::names::{NameQuery, NameScorer, NgramType};
mod aka;
mod episode;
mod id;
mod names;
mod rating;
#[cfg(test)]
mod tests;
mod writer;
const VERSION: u64 = 1;
const TITLE: &str = "title.fst";
const CONFIG: &str = "config.json";
#[derive(Clone, Debug)]
pub struct MediaEntity {
title: Title,
episode: Option<Episode>,
rating: Option<Rating>,
}
impl MediaEntity {
pub fn title(&self) -> &Title {
&self.title
}
pub fn episode(&self) -> Option<&Episode> {
self.episode.as_ref()
}
pub fn rating(&self) -> Option<&Rating> {
self.rating.as_ref()
}
}
#[derive(Debug)]
pub struct Index {
data_dir: PathBuf,
index_dir: PathBuf,
csv_basic: csv::Reader<io::Cursor<Mmap>>,
idx_names: names::IndexReader,
idx_aka: aka::Index,
idx_episode: episode::Index,
idx_rating: rating::Index,
idx_title: id::IndexReader,
}
#[derive(Debug, Deserialize, Serialize)]
struct Config {
version: u64,
}
impl Index {
pub fn open<P1: AsRef<Path>, P2: AsRef<Path>>(
data_dir: P1,
index_dir: P2,
) -> Result<Index> {
IndexBuilder::new().open(data_dir, index_dir)
}
pub fn create<P1: AsRef<Path>, P2: AsRef<Path>>(
data_dir: P1,
index_dir: P2,
) -> Result<Index> {
IndexBuilder::new().create(data_dir, index_dir)
}
pub fn try_clone(&self) -> Result<Index> {
Index::open(&self.data_dir, &self.index_dir)
}
pub fn search(
&mut self,
query: &names::NameQuery,
) -> Result<SearchResults<Title>> {
let mut results = SearchResults::new();
for result in self.idx_names.search(query) {
let title = match self.read_record(*result.value())? {
None => continue,
Some(title) => title,
};
results.push(result.map(|_| title));
}
Ok(results)
}
pub fn entity(&mut self, id: &str) -> Result<Option<MediaEntity>> {
match self.title(id)? {
None => Ok(None),
Some(title) => self.entity_from_title(title).map(Some),
}
}
pub fn entity_from_title(&mut self, title: Title) -> Result<MediaEntity> {
let episode = match title.kind {
TitleKind::TVEpisode => self.episode(&title.id)?,
_ => None,
};
let rating = self.rating(&title.id)?;
Ok(MediaEntity { title, episode, rating })
}
pub fn title(&mut self, id: &str) -> Result<Option<Title>> {
match self.idx_title.get(id.as_bytes()) {
None => Ok(None),
Some(offset) => self.read_record(offset),
}
}
pub fn aka_records(&mut self, id: &str) -> Result<AKARecordIter> {
self.idx_aka.find(id.as_bytes())
}
pub fn rating(&mut self, id: &str) -> Result<Option<Rating>> {
self.idx_rating.rating(id.as_bytes())
}
pub fn seasons(&mut self, tvshow_id: &str) -> Result<Vec<Episode>> {
self.idx_episode.seasons(tvshow_id.as_bytes())
}
pub fn episodes(
&mut self,
tvshow_id: &str,
season: u32,
) -> Result<Vec<Episode>> {
self.idx_episode.episodes(tvshow_id.as_bytes(), season)
}
pub fn episode(&mut self, episode_id: &str) -> Result<Option<Episode>> {
self.idx_episode.episode(episode_id.as_bytes())
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
pub fn index_dir(&self) -> &Path {
&self.index_dir
}
fn read_record(&mut self, offset: u64) -> Result<Option<Title>> {
let mut pos = csv::Position::new();
pos.set_byte(offset);
self.csv_basic.seek(pos).map_err(Error::csv)?;
let mut record = csv::StringRecord::new();
if !self.csv_basic.read_record(&mut record).map_err(Error::csv)? {
Ok(None)
} else {
let headers = self.csv_basic.headers().map_err(Error::csv)?;
Ok(record.deserialize(Some(headers)).map_err(Error::csv)?)
}
}
}
#[derive(Debug)]
pub struct IndexBuilder {
ngram_type: NgramType,
ngram_size: usize,
}
impl IndexBuilder {
pub fn new() -> IndexBuilder {
IndexBuilder { ngram_type: NgramType::default(), ngram_size: 3 }
}
pub fn open<P1: AsRef<Path>, P2: AsRef<Path>>(
&self,
data_dir: P1,
index_dir: P2,
) -> Result<Index> {
let data_dir = data_dir.as_ref();
let index_dir = index_dir.as_ref();
log::debug!("opening index {}", index_dir.display());
let config_file = open_file(index_dir.join(CONFIG))?;
let config: Config = serde_json::from_reader(config_file)
.map_err(|e| Error::config(e.to_string()))?;
if config.version != VERSION {
return Err(Error::version(VERSION, config.version));
}
Ok(Index {
data_dir: data_dir.to_path_buf(),
index_dir: index_dir.to_path_buf(),
csv_basic: unsafe { csv_mmap(data_dir.join(IMDB_BASICS))? },
idx_names: names::IndexReader::open(index_dir)?,
idx_aka: aka::Index::open(data_dir, index_dir)?,
idx_episode: episode::Index::open(index_dir)?,
idx_rating: rating::Index::open(index_dir)?,
idx_title: id::IndexReader::from_path(index_dir.join(TITLE))?,
})
}
pub fn create<P1: AsRef<Path>, P2: AsRef<Path>>(
&self,
data_dir: P1,
index_dir: P2,
) -> Result<Index> {
let data_dir = data_dir.as_ref();
let index_dir = index_dir.as_ref();
fs::create_dir_all(index_dir)
.map_err(|e| Error::io_path(e, index_dir))?;
log::info!("creating index at {}", index_dir.display());
let job = {
let data_dir = data_dir.to_path_buf();
let index_dir = index_dir.to_path_buf();
thread::spawn(move || -> Result<()> {
let start = Instant::now();
rating::Index::create(&data_dir, &index_dir)?;
log::info!(
"created rating index (took {})",
NiceDuration::since(start)
);
let start = Instant::now();
episode::Index::create(&data_dir, &index_dir)?;
log::info!(
"created episode index (took {})",
NiceDuration::since(start)
);
Ok(())
})
};
let start = Instant::now();
let mut aka_index = aka::Index::create(data_dir, index_dir)?;
log::info!("created AKA index (took {})", NiceDuration::since(start));
let start = Instant::now();
create_name_index(
&mut aka_index,
data_dir,
index_dir,
self.ngram_type,
self.ngram_size,
)?;
log::info!(
"created name index, ngram type: {}, ngram size: {} (took {})",
self.ngram_type,
self.ngram_size,
NiceDuration::since(start)
);
job.join().unwrap()?;
let config_file = create_file(index_dir.join(CONFIG))?;
serde_json::to_writer_pretty(
config_file,
&Config { version: VERSION },
)
.map_err(|e| Error::config(e.to_string()))?;
self.open(data_dir, index_dir)
}
pub fn ngram_type(&mut self, ngram_type: NgramType) -> &mut IndexBuilder {
self.ngram_type = ngram_type;
self
}
pub fn ngram_size(&mut self, ngram_size: usize) -> &mut IndexBuilder {
self.ngram_size = ngram_size;
self
}
}
impl Default for IndexBuilder {
fn default() -> IndexBuilder {
IndexBuilder::new()
}
}
fn create_name_index(
aka_index: &mut aka::Index,
data_dir: &Path,
index_dir: &Path,
ngram_type: NgramType,
ngram_size: usize,
) -> Result<()> {
let (mut count, mut title_count) = (0u64, 0u64);
let mut wtr = names::IndexWriter::open(index_dir, ngram_type, ngram_size)?;
let mut twtr = id::IndexSortedWriter::from_path(index_dir.join(TITLE))?;
let mut rdr = csv_file(data_dir.join(IMDB_BASICS))?;
let mut record = csv::StringRecord::new();
while rdr.read_record(&mut record).map_err(Error::csv)? {
let pos = record.position().expect("position on row");
let id = &record[0];
let title = &record[2];
let original_title = &record[3];
let is_adult = &record[4] == "1";
if is_adult {
continue;
}
count += 1;
title_count += 1;
twtr.insert(id.as_bytes(), pos.byte())?;
wtr.insert(pos.byte(), title)?;
if title != original_title {
wtr.insert(pos.byte(), original_title)?;
count += 1;
}
for result in aka_index.find(id.as_bytes())? {
let akarecord = result?;
if title != akarecord.title {
wtr.insert(pos.byte(), &akarecord.title)?;
count += 1;
}
}
}
wtr.finish()?;
twtr.finish()?;
log::info!("{} titles indexed", title_count);
log::info!("{} total names indexed", count);
Ok(())
}