1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
use super::*; use std::time::Instant; use tantivy::{ collector::TopDocs, query::QueryParser, schema::Schema, Index, IndexReader, IndexWriter, ReloadPolicy, }; pub struct ContentIndexer { fields: Fields, index: Index, reader: IndexReader, writer: IndexWriter, } impl ContentIndexer { pub fn new() -> ChatRecordResult<Self> { let fields = Fields::default(); let (index, reader) = Self::get_index_handle(fields.schema.clone())?; let writer = Self::get_index_writer(&index)?; Ok(Self { fields, index, reader, writer, }) } fn get_index_handle(schema: Schema) -> ChatRecordResult<(Index, IndexReader)> { let index = Index::create_in_ram(schema); tokenizers_register(index.tokenizers()); let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) .try_into()?; Ok((index, reader)) } fn get_index_writer(index: &Index) -> ChatRecordResult<IndexWriter> { let num = num_cpus::get(); info!("Indexing thread num: {}", num); Ok(index.writer_with_num_threads(num, num * 10_000_000)?) } pub fn cleanup_index(&mut self) -> ChatRecordResult<()> { self.writer.delete_all_documents()?; self.writer.commit()?; Ok(()) } pub fn gen_index<D: GetDocument>(&mut self, records: Vec<D>) -> ChatRecordResult<()> { let total = records.len() as f64; let mut last_parent = 0.0; let mut sw = Instant::now(); for (i, metadata) in records.iter().enumerate() { self.writer .add_document(metadata.get_document(&self.fields)?); if total > 200.0 && i as f64 / total - last_parent >= 0.01 { last_parent = i as f64 / total; debug!( "current progress: {} / {}, {}%, {}ms", i, total, last_parent * 100.0, sw.elapsed().as_millis() ); sw = Instant::now(); } } self.writer.commit()?; futures::executor::block_on(self.writer.garbage_collect_files())?; self.reader.reload()?; Ok(()) } pub fn search(&self, offset: i64, limit: i64, query: &str) -> ChatRecordResult<Vec<i32>> { let offset = if offset > 0 { offset as usize } else { 0 }; let searcher = self.reader.searcher(); Ok(searcher .search( &QueryParser::for_index( &self.index, self.fields .custom .iter() .cloned() .chain(vec![self.fields.content]) .collect(), ) .parse_query(query)?, &TopDocs::with_limit(offset + limit as usize) .order_by_u64_field(self.fields.timestamp), )? .iter() .skip(offset) .filter_map(|(_score, doc_address)| { use tantivy::schema::Value; searcher.doc(*doc_address).ok().and_then(|doc| { doc.get_first(self.fields.idx).and_then(|val| match val { Value::U64(val) => Some(*val as i32), Value::I64(val) => Some(*val as i32), _ => None, }) }) }) .collect()) } }