Skip to main content

v_common/ft_xapian/
xapian_reader.rs

1use v_authorization_impl::AzContext;
2use crate::ft_xapian::index_schema::IndexerSchema;
3use crate::ft_xapian::init_db_path;
4use crate::ft_xapian::key2slot::Key2Slot;
5use crate::ft_xapian::vql::TTA;
6use crate::ft_xapian::xapian_vql::{exec_xapian_query_and_queue_authorize, get_sorter, transform_vql_to_xapian, AuxContext};
7use crate::module::common::load_onto;
8use crate::module::info::ModuleInfo;
9use v_individual_model::onto::individual::Individual;
10use v_individual_model::onto::onto_impl::Onto;
11use v_individual_model::onto::onto_index::OntoIndex;
12use crate::search::common::{FTQuery, QueryResult};
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use crate::v_api::common_type::OptAuthorize;
17use futures::executor::block_on;
18use std::collections::HashMap;
19use std::io::{Error, ErrorKind};
20use std::time::Instant;
21use std::time::SystemTime;
22use xapian_rusty::*;
23
24const MAX_WILDCARD_EXPANSION: i32 = 20_000;
25const BASE_PATH: &str = "./data";
26
27pub struct DatabaseQueryParser {
28    db: Database,
29    qp: QueryParser,
30}
31
32impl DatabaseQueryParser {
33    fn add_database(&mut self, db_name: &str, opened_db: &mut HashMap<String, Database>) -> Result<()> {
34        if let Some(add_db) = opened_db.get_mut(db_name) {
35            self.db.add_database(add_db)?;
36        }
37        Ok(())
38    }
39}
40
41pub struct XapianReader {
42    pub index_schema: IndexerSchema,
43    pub onto: Onto,
44    pub onto_modified: SystemTime,
45    using_dbqp: HashMap<Vec<String>, DatabaseQueryParser>,
46    opened_db: HashMap<String, Database>,
47    xapian_stemmer: Stem,
48    xapian_lang: String,
49    mdif: ModuleInfo,
50    key2slot: Key2Slot,
51    db2path: HashMap<String, String>,
52    committed_op_id: i64,
53    az: AzContext,
54}
55
56impl XapianReader {
57    pub fn new(lang: &str, storage: &mut VStorage) -> Option<Self> {
58        let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
59        if indexer_module_info.is_err() {
60            error!("{:?}", indexer_module_info.err());
61            return None;
62        }
63
64        let mut onto = Onto::default();
65        load_onto(storage, &mut onto);
66
67        let mut xr = XapianReader {
68            using_dbqp: Default::default(),
69            opened_db: Default::default(),
70            xapian_stemmer: Stem::new(lang).unwrap(),
71            xapian_lang: lang.to_string(),
72            index_schema: Default::default(),
73            mdif: indexer_module_info.unwrap(),
74            key2slot: Key2Slot::load().unwrap_or_default(),
75            onto,
76            db2path: init_db_path(),
77            committed_op_id: 0,
78            onto_modified: SystemTime::now(),
79            az: AzContext::default(),
80        };
81
82        xr.load_index_schema(storage);
83
84        Some(xr)
85    }
86
87    pub fn new_without_init(lang: &str) -> Option<Self> {
88        let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
89        if indexer_module_info.is_err() {
90            error!("{:?}", indexer_module_info.err());
91            return None;
92        }
93
94        let xr = XapianReader {
95            using_dbqp: Default::default(),
96            opened_db: Default::default(),
97            xapian_stemmer: Stem::new(lang).unwrap(),
98            xapian_lang: lang.to_string(),
99            index_schema: Default::default(),
100            mdif: indexer_module_info.unwrap(),
101            key2slot: Key2Slot::load().unwrap_or_default(),
102            onto: Onto::default(),
103            db2path: init_db_path(),
104            committed_op_id: 0,
105            onto_modified: SystemTime::UNIX_EPOCH,
106            az: AzContext::default(),
107        };
108
109        Some(xr)
110    }
111
112    pub fn query_use_authorize(&mut self, request: FTQuery, storage: &mut VStorage, op_auth: OptAuthorize, reopen: bool) -> QueryResult {
113        if reopen {
114            if let Err(e) = self.reopen_dbs() {
115                error!("fail reopen xapian databases: {:?}", e);
116            }
117        }
118
119        let mut res_out_list = vec![];
120        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
121            ctx.push(id.to_owned());
122        }
123
124        if let Some(t) = OntoIndex::get_modified() {
125            if t > self.onto_modified {
126                load_onto(storage, &mut self.onto);
127                self.onto_modified = t;
128            }
129        }
130        if self.index_schema.is_empty() {
131            self.load_index_schema(storage);
132        }
133
134        if let Ok(mut res) = block_on(self.query_use_collect_fn(&request, add_out_element, op_auth, &mut res_out_list)) {
135            res.result = res_out_list;
136            debug!("res={:?}", res);
137            return res;
138        }
139        QueryResult::default()
140    }
141
142    pub fn query(&mut self, request: FTQuery, storage: &mut VStorage) -> QueryResult {
143        self.query_use_authorize(request, storage, OptAuthorize::YES, false)
144    }
145
146    pub async fn query_use_collect_fn<T>(
147        &mut self,
148        request: &FTQuery,
149        add_out_element: fn(uri: &str, ctx: &mut T),
150        op_auth: OptAuthorize,
151        out_list: &mut T,
152    ) -> Result<QueryResult> {
153        let total_time = Instant::now();
154        let mut sr = QueryResult::default();
155
156        let wtta = TTA::parse_expr(&request.query);
157
158        if wtta.is_none() {
159            error!("fail parse query (phase 1) [{}], tta is empty", request.query);
160            sr.result_code = ResultCode::BadRequest;
161            return Ok(sr);
162        }
163
164        if self.key2slot.is_need_reload()? {
165            self.key2slot = Key2Slot::load()?;
166        }
167
168        let mut tta = wtta.unwrap();
169
170        let db_names = self.get_dn_names(&tta, &request.databases);
171
172        debug!("db_names={:?}", db_names);
173        debug!(
174            "user_uri=[{}] query=[{}] str_sort=[{}], db_names=[{:?}], from=[{}], top=[{}], limit=[{}]",
175            request.user, request.query, request.sort, request.databases, request.from, request.top, request.limit
176        );
177        debug!("TTA [{}]", tta);
178
179        if let Some((_, new_committed_op_id)) = self.mdif.read_info() {
180            if new_committed_op_id > self.committed_op_id {
181                info!("search:reopen_db: new committed_op_id={} > prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
182                self.reopen_dbs()?;
183                self.committed_op_id = new_committed_op_id;
184            } else {
185                debug!("search:check reopen_db: new committed_op_id={}, prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
186            }
187        }
188
189        self.open_dbqp_if_need(&db_names)?;
190
191        let mut query = Query::new()?;
192        if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
193            let mut _rd: f64 = 0.0;
194            let mut ctx = AuxContext {
195                key2slot: &self.key2slot,
196                qp: &mut dbqp.qp,
197                onto: &self.onto,
198            };
199            transform_vql_to_xapian(&mut ctx, &mut tta, None, None, &mut query, &mut _rd, 0)?;
200        }
201
202        debug!("query={:?}", query.get_description());
203
204        if query.is_empty() {
205            sr.result_code = ResultCode::Ok;
206            warn!("query is empty [{}]", request.query);
207            return Ok(sr);
208        }
209
210        if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
211            let mut xapian_enquire = dbqp.db.new_enquire()?;
212
213            xapian_enquire.set_query(&mut query)?;
214
215            if let Some(s) = get_sorter(&request.sort, &self.key2slot)? {
216                xapian_enquire.set_sort_by_key(s, true)?;
217            }
218
219            sr = exec_xapian_query_and_queue_authorize(request, &mut xapian_enquire, add_out_element, op_auth, out_list, &mut self.az).await;
220        }
221
222        debug!("res={:?}", sr);
223        sr.total_time = total_time.elapsed().as_millis() as i64;
224        sr.query_time = sr.total_time - sr.authorize_time;
225
226        Ok(sr)
227    }
228
229    pub fn load_index_schema(&mut self, storage: &mut VStorage) {
230        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
231            ctx.push(id.to_owned());
232        }
233        let mut ctx: Vec<String> = vec![];
234
235        match block_on(self.query_use_collect_fn(
236            &FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"),
237            add_out_element,
238            OptAuthorize::NO,
239            &mut ctx,
240        )) {
241            Ok(res) => {
242                if res.result_code == ResultCode::Ok && res.count > 0 {
243                    for id in ctx.iter() {
244                        let indv = &mut Individual::default();
245                        if storage.get_individual(id, indv).is_ok() {
246                            self.index_schema.add_schema_data(&self.onto, indv);
247                        }
248                    }
249                } else {
250                    error!("fail load index schema, err={:?}", res.result_code);
251                }
252            },
253            Err(e) => match e {
254                XError::Xapian(code) => {
255                    error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
256                },
257                XError::Io(e) => {
258                    error!("fail load index schema, err={:?}", e);
259                },
260            },
261        }
262    }
263
264    pub async fn c_load_index_schema(&mut self, storage: &AStorage) {
265        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
266            ctx.push(id.to_owned());
267        }
268        let mut ctx: Vec<String> = vec![];
269
270        info!("start load index schema");
271        match self
272            .query_use_collect_fn(&FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"), add_out_element, OptAuthorize::NO, &mut ctx)
273            .await
274        {
275            Ok(res) => {
276                if res.result_code == ResultCode::Ok && res.count > 0 {
277                    for id in ctx.iter() {
278                        if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
279                            if res == ResultCode::Ok {
280                                self.index_schema.add_schema_data(&self.onto, &mut indv);
281                            }
282                        }
283                    }
284                } else {
285                    error!("fail load index schema, err={:?}", res.result_code);
286                }
287            },
288            Err(e) => match e {
289                XError::Xapian(code) => {
290                    error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
291                },
292                XError::Io(e) => {
293                    error!("fail load index schema, err={:?}", e);
294                },
295            },
296        }
297
298        info!("load index schema, size={}", self.index_schema.len());
299    }
300
301    fn reopen_dbs(&mut self) -> Result<()> {
302        for (_, el) in self.using_dbqp.iter_mut() {
303            el.db.reopen()?;
304            el.qp.set_database(&mut el.db)?;
305        }
306
307        for (_, db) in self.opened_db.iter_mut() {
308            db.reopen()?;
309        }
310
311        Ok(())
312    }
313
314    fn _close_dbs(&mut self) -> Result<()> {
315        for (_, el) in self.using_dbqp.iter_mut() {
316            el.db.close()?;
317        }
318
319        for (_, db) in self.opened_db.iter_mut() {
320            db.close()?;
321        }
322
323        Ok(())
324    }
325
326    fn open_db_if_need(&mut self, db_name: &str) -> Result<()> {
327        if !self.opened_db.contains_key(db_name) {
328            if let Some(path) = self.db2path.get(db_name) {
329                let db = Database::new_with_path(&("./".to_owned() + path), UNKNOWN)?;
330                self.opened_db.insert(db_name.to_owned(), db);
331            } else {
332                return Err(XError::from(Error::new(ErrorKind::Other, "db2path invalid")));
333            }
334        }
335        Ok(())
336    }
337
338    fn open_dbqp_if_need(&mut self, db_names: &[String]) -> Result<()> {
339        if !self.using_dbqp.contains_key(db_names) {
340            for el in db_names {
341                self.open_db_if_need(el)?;
342            }
343
344            let mut dbqp = DatabaseQueryParser {
345                db: Database::new()?,
346                qp: QueryParser::new()?,
347            };
348
349            for el in db_names {
350                self.open_db_if_need(el)?;
351                dbqp.add_database(el, &mut self.opened_db)?;
352            }
353
354            dbqp.qp.set_max_wildcard_expansion(MAX_WILDCARD_EXPANSION)?;
355
356            self.xapian_stemmer = Stem::new(&self.xapian_lang)?;
357
358            dbqp.qp.set_stemmer(&mut self.xapian_stemmer)?;
359
360            dbqp.qp.set_database(&mut dbqp.db)?;
361
362            self.using_dbqp.insert(db_names.to_vec(), dbqp);
363        }
364        /*
365           committed_op_id = get_info().committed_op_id;
366        */
367        Ok(())
368    }
369
370    fn get_dn_names(&self, tta: &TTA, db_names_str: &str) -> Vec<String> {
371        let mut db_names = vec![];
372
373        if db_names_str.is_empty() {
374            let mut databases = HashMap::new();
375            self.db_names_from_tta(tta, &mut databases);
376
377            for (key, value) in databases.iter() {
378                if !(*value) {
379                    if key != "not-indexed" {
380                        db_names.push(key.to_owned());
381                    }
382
383                    // при автоопределении баз, если находится база deleted, то другие базы исключаются
384                    if key == "deleted" {
385                        db_names.clear();
386                        db_names.push(key.to_owned());
387                        break;
388                    }
389                }
390            }
391        } else {
392            for el in db_names_str.split(',') {
393                db_names.push(String::from(el).trim().to_owned());
394            }
395        }
396
397        if db_names.is_empty() {
398            db_names.push("base".to_owned());
399        }
400
401        db_names
402    }
403
404    fn db_names_from_tta(&self, tta: &TTA, db_names: &mut HashMap<String, bool>) -> String {
405        let mut ll = String::default();
406        let mut rr = String::default();
407
408        if let Some(l) = &tta.l {
409            ll = self.db_names_from_tta(l, db_names)
410        };
411
412        if let Some(r) = &tta.r {
413            rr = self.db_names_from_tta(r, db_names);
414        }
415
416        if !ll.is_empty() && !rr.is_empty() {
417            if ll == "rdf:type" {
418                let dbn = self.index_schema.get_dbname_of_class(&rr);
419                db_names.insert(dbn.to_owned(), false);
420            } else if ll == "v-s:deleted" {
421                db_names.insert("deleted".to_owned(), false);
422            }
423        }
424
425        tta.op.to_owned()
426    }
427}