1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
use super::*;
use std::time::Instant;
use tantivy::{
    collector::TopDocs, query::QueryParser, schema::Schema, Index, IndexReader, IndexWriter,
    ReloadPolicy,
};

pub struct ContentIndexer {
    fields: Fields,
    index: Index,
    reader: IndexReader,
    writer: IndexWriter,
}

impl ContentIndexer {
    pub fn new() -> ChatRecordResult<Self> {
        let fields = Fields::default();
        let (index, reader) = Self::get_index_handle(fields.schema.clone())?;
        let writer = Self::get_index_writer(&index)?;

        Ok(Self {
            fields,
            index,
            reader,
            writer,
        })
    }

    fn get_index_handle(schema: Schema) -> ChatRecordResult<(Index, IndexReader)> {
        let index = Index::create_in_ram(schema);
        tokenizers_register(index.tokenizers());
        let reader = index
            .reader_builder()
            .reload_policy(ReloadPolicy::Manual)
            .try_into()?;
        Ok((index, reader))
    }

    fn get_index_writer(index: &Index) -> ChatRecordResult<IndexWriter> {
        let num = num_cpus::get();
        info!("Indexing thread num: {}", num);
        Ok(index.writer_with_num_threads(num, num * 10_000_000)?)
    }

    pub fn cleanup_index(&mut self) -> ChatRecordResult<()> {
        self.writer.delete_all_documents()?;
        self.writer.commit()?;
        Ok(())
    }

    pub fn gen_index<D: GetDocument>(&mut self, records: Vec<D>) -> ChatRecordResult<()> {
        let total = records.len() as f64;
        let mut last_parent = 0.0;
        let mut sw = Instant::now();
        for (i, metadata) in records.iter().enumerate() {
            self.writer
                .add_document(metadata.get_document(&self.fields)?);
            if total > 200.0 && i as f64 / total - last_parent >= 0.01 {
                last_parent = i as f64 / total;
                debug!(
                    "current progress: {} / {}, {}%, {}ms",
                    i,
                    total,
                    last_parent * 100.0,
                    sw.elapsed().as_millis()
                );
                sw = Instant::now();
            }
        }
        self.writer.commit()?;
        futures::executor::block_on(self.writer.garbage_collect_files())?;
        self.reader.reload()?;
        Ok(())
    }

    pub fn search(&self, offset: i64, limit: i64, query: &str) -> ChatRecordResult<Vec<i32>> {
        let offset = if offset > 0 { offset as usize } else { 0 };
        let searcher = self.reader.searcher();
        Ok(searcher
            .search(
                &QueryParser::for_index(
                    &self.index,
                    self.fields
                        .custom
                        .iter()
                        .cloned()
                        .chain(vec![self.fields.content])
                        .collect(),
                )
                .parse_query(query)?,
                &TopDocs::with_limit(offset + limit as usize)
                    .order_by_u64_field(self.fields.timestamp),
            )?
            .iter()
            .skip(offset)
            .filter_map(|(_score, doc_address)| {
                use tantivy::schema::Value;
                searcher.doc(*doc_address).ok().and_then(|doc| {
                    doc.get_first(self.fields.idx).and_then(|val| match val {
                        Value::U64(val) => Some(*val as i32),
                        Value::I64(val) => Some(*val as i32),
                        _ => None,
                    })
                })
            })
            .collect())
    }
}