Skip to main content

v_common/ft_xapian/
xapian_reader.rs

1use v_authorization_lmdb_impl::AzContext;
2use crate::ft_xapian::index_schema::IndexerSchema;
3use crate::ft_xapian::init_db_path;
4use crate::ft_xapian::key2slot::Key2Slot;
5use crate::ft_xapian::vql::TTA;
6use crate::ft_xapian::xapian_vql::{exec_xapian_query_and_queue_authorize, get_sorter, transform_vql_to_xapian, AuxContext};
7use crate::module::common::load_onto;
8use crate::module::info::ModuleInfo;
9use v_individual_model::onto::individual::Individual;
10use v_individual_model::onto::onto_impl::Onto;
11use v_individual_model::onto::onto_index::OntoIndex;
12use crate::search::common::{FTQuery, QueryResult};
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use crate::v_api::common_type::OptAuthorize;
17use crate::v_authorization::common::AuthorizationContext;
18use v_authorization::common::Access;
19use futures::executor::block_on;
20use std::collections::HashMap;
21use std::io::{Error, ErrorKind};
22use std::time::Instant;
23use std::time::SystemTime;
24use xapian_rusty::*;
25
26const MAX_WILDCARD_EXPANSION: i32 = 20_000;
27const BASE_PATH: &str = "./data";
28
29pub struct DatabaseQueryParser {
30    db: Database,
31    qp: QueryParser,
32}
33
34impl DatabaseQueryParser {
35    fn add_database(&mut self, db_name: &str, opened_db: &mut HashMap<String, Database>) -> Result<()> {
36        if let Some(add_db) = opened_db.get_mut(db_name) {
37            self.db.add_database(add_db)?;
38        }
39        Ok(())
40    }
41}
42
43pub struct XapianReader {
44    pub index_schema: IndexerSchema,
45    pub onto: Onto,
46    pub onto_modified: SystemTime,
47    using_dbqp: HashMap<Vec<String>, DatabaseQueryParser>,
48    opened_db: HashMap<String, Database>,
49    xapian_stemmer: Stem,
50    xapian_lang: String,
51    mdif: ModuleInfo,
52    key2slot: Key2Slot,
53    db2path: HashMap<String, String>,
54    committed_op_id: i64,
55    az: AzContext,
56}
57
58impl XapianReader {
59    pub fn new(lang: &str, storage: &mut VStorage) -> Option<Self> {
60        let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
61        if indexer_module_info.is_err() {
62            error!("{:?}", indexer_module_info.err());
63            return None;
64        }
65
66        let mut onto = Onto::default();
67        load_onto(storage, &mut onto);
68
69        let mut xr = XapianReader {
70            using_dbqp: Default::default(),
71            opened_db: Default::default(),
72            xapian_stemmer: Stem::new(lang).unwrap(),
73            xapian_lang: lang.to_string(),
74            index_schema: Default::default(),
75            mdif: indexer_module_info.unwrap(),
76            key2slot: Key2Slot::load().unwrap_or_default(),
77            onto,
78            db2path: init_db_path(),
79            committed_op_id: 0,
80            onto_modified: SystemTime::now(),
81            az: AzContext::default(),
82        };
83
84        xr.load_index_schema(storage);
85
86        Some(xr)
87    }
88
89    pub fn new_without_init(lang: &str) -> Option<Self> {
90        let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
91        if indexer_module_info.is_err() {
92            error!("{:?}", indexer_module_info.err());
93            return None;
94        }
95
96        let xr = XapianReader {
97            using_dbqp: Default::default(),
98            opened_db: Default::default(),
99            xapian_stemmer: Stem::new(lang).unwrap(),
100            xapian_lang: lang.to_string(),
101            index_schema: Default::default(),
102            mdif: indexer_module_info.unwrap(),
103            key2slot: Key2Slot::load().unwrap_or_default(),
104            onto: Onto::default(),
105            db2path: init_db_path(),
106            committed_op_id: 0,
107            onto_modified: SystemTime::UNIX_EPOCH,
108            az: AzContext::default(),
109        };
110
111        Some(xr)
112    }
113
114    pub fn query_use_authorize(&mut self, request: FTQuery, storage: &mut VStorage, op_auth: OptAuthorize, reopen: bool) -> QueryResult {
115        if reopen {
116            if let Err(e) = self.reopen_dbs() {
117                error!("fail reopen xapian databases: {:?}", e);
118            }
119        }
120
121        let mut res_out_list = vec![];
122        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
123            ctx.push(id.to_owned());
124        }
125
126        if let Some(t) = OntoIndex::get_modified() {
127            if t > self.onto_modified {
128                load_onto(storage, &mut self.onto);
129                self.onto_modified = t;
130            }
131        }
132        if self.index_schema.is_empty() {
133            self.load_index_schema(storage);
134        }
135
136        if let Ok(mut res) = block_on(self.query_use_collect_fn(&request, add_out_element, op_auth, &mut res_out_list)) {
137            res.result = res_out_list;
138            debug!("res={:?}", res);
139            return res;
140        }
141        QueryResult::default()
142    }
143
144    pub fn query(&mut self, request: FTQuery, storage: &mut VStorage) -> QueryResult {
145        self.query_use_authorize(request, storage, OptAuthorize::YES, false)
146    }
147
148    pub async fn query_use_collect_fn<T>(
149        &mut self,
150        request: &FTQuery,
151        add_out_element: fn(uri: &str, ctx: &mut T),
152        op_auth: OptAuthorize,
153        out_list: &mut T,
154    ) -> Result<QueryResult> {
155        let mut az = std::mem::take(&mut self.az);
156        let result = self
157            .query_use_collect_with_auth_fn(
158                request,
159                add_out_element,
160                op_auth,
161                out_list,
162                &mut |pairs: &[(&str, &str)]| {
163                    pairs
164                        .iter()
165                        .map(|(subject_id, user_id)| match az.authorize(subject_id, user_id, Access::CanRead as u8, true) {
166                            Ok(result) => result == Access::CanRead as u8,
167                            Err(e) => {
168                                error!("Ошибка при проверке авторизации: subject_id=[{}], user_id=[{}], error=[{:?}]", subject_id, user_id, e);
169                                false
170                            },
171                        })
172                        .collect()
173                },
174                1,
175            )
176            .await;
177        self.az = az;
178        result
179    }
180
181    /// Вариант [`query_use_collect_fn`] с внешней пакетной функцией авторизации.
182    /// Позволяет передать произвольную логику авторизации вместо встроенного `AzContext`.
183    ///
184    /// `authorize(pairs)` принимает N пар `(subject_id, user_id)` и возвращает N булевых результатов.
185    /// `batch_size` задаёт количество идентификаторов, передаваемых за один вызов авторизации.
186    pub async fn query_use_collect_with_auth_fn<T, F>(
187        &mut self,
188        request: &FTQuery,
189        add_out_element: fn(uri: &str, ctx: &mut T),
190        op_auth: OptAuthorize,
191        out_list: &mut T,
192        authorize: &mut F,
193        batch_size: usize,
194    ) -> Result<QueryResult>
195    where
196        F: FnMut(&[(&str, &str)]) -> Vec<bool>,
197    {
198        let total_time = Instant::now();
199        let mut sr = QueryResult::default();
200
201        let wtta = TTA::parse_expr(&request.query);
202
203        if wtta.is_none() {
204            error!("fail parse query (phase 1) [{}], tta is empty", request.query);
205            sr.result_code = ResultCode::BadRequest;
206            return Ok(sr);
207        }
208
209        if self.key2slot.is_need_reload()? {
210            self.key2slot = Key2Slot::load()?;
211        }
212
213        let mut tta = wtta.unwrap();
214
215        let db_names = self.get_dn_names(&tta, &request.databases);
216
217        debug!("db_names={:?}", db_names);
218        debug!(
219            "user_uri=[{}] query=[{}] str_sort=[{}], db_names=[{:?}], from=[{}], top=[{}], limit=[{}]",
220            request.user, request.query, request.sort, request.databases, request.from, request.top, request.limit
221        );
222        debug!("TTA [{}]", tta);
223
224        if let Some((_, new_committed_op_id)) = self.mdif.read_info() {
225            if new_committed_op_id > self.committed_op_id {
226                info!("search:reopen_db: new committed_op_id={} > prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
227                self.reopen_dbs()?;
228                self.committed_op_id = new_committed_op_id;
229            } else {
230                debug!("search:check reopen_db: new committed_op_id={}, prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
231            }
232        }
233
234        self.open_dbqp_if_need(&db_names)?;
235
236        let mut query = Query::new()?;
237        if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
238            let mut _rd: f64 = 0.0;
239            let mut ctx = AuxContext {
240                key2slot: &self.key2slot,
241                qp: &mut dbqp.qp,
242                onto: &self.onto,
243            };
244            transform_vql_to_xapian(&mut ctx, &mut tta, None, None, &mut query, &mut _rd, 0)?;
245        }
246
247        debug!("query={:?}", query.get_description());
248
249        if query.is_empty() {
250            sr.result_code = ResultCode::Ok;
251            warn!("query is empty [{}]", request.query);
252            return Ok(sr);
253        }
254
255        if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
256            let mut xapian_enquire = dbqp.db.new_enquire()?;
257
258            xapian_enquire.set_query(&mut query)?;
259
260            if let Some(s) = get_sorter(&request.sort, &self.key2slot)? {
261                xapian_enquire.set_sort_by_key(s, true)?;
262            }
263
264            sr = exec_xapian_query_and_queue_authorize(request, &mut xapian_enquire, add_out_element, op_auth, out_list, authorize, batch_size).await;
265        }
266
267        debug!("res={:?}", sr);
268        sr.total_time = total_time.elapsed().as_millis() as i64;
269        sr.query_time = sr.total_time - sr.authorize_time;
270
271        Ok(sr)
272    }
273
274    pub fn load_index_schema(&mut self, storage: &mut VStorage) {
275        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
276            ctx.push(id.to_owned());
277        }
278        let mut ctx: Vec<String> = vec![];
279
280        match block_on(self.query_use_collect_fn(
281            &FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"),
282            add_out_element,
283            OptAuthorize::NO,
284            &mut ctx,
285        )) {
286            Ok(res) => {
287                if res.result_code == ResultCode::Ok && res.count > 0 {
288                    for id in ctx.iter() {
289                        let indv = &mut Individual::default();
290                        if storage.get_individual(id, indv).is_ok() {
291                            self.index_schema.add_schema_data(&self.onto, indv);
292                        }
293                    }
294                } else {
295                    error!("fail load index schema, err={:?}", res.result_code);
296                }
297            },
298            Err(e) => match e {
299                XError::Xapian(code) => {
300                    error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
301                },
302                XError::Io(e) => {
303                    error!("fail load index schema, err={:?}", e);
304                },
305            },
306        }
307    }
308
309    pub async fn c_load_index_schema(&mut self, storage: &AStorage) {
310        fn add_out_element(id: &str, ctx: &mut Vec<String>) {
311            ctx.push(id.to_owned());
312        }
313        let mut ctx: Vec<String> = vec![];
314
315        info!("start load index schema");
316        match self
317            .query_use_collect_fn(&FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"), add_out_element, OptAuthorize::NO, &mut ctx)
318            .await
319        {
320            Ok(res) => {
321                if res.result_code == ResultCode::Ok && res.count > 0 {
322                    for id in ctx.iter() {
323                        if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
324                            if res == ResultCode::Ok {
325                                self.index_schema.add_schema_data(&self.onto, &mut indv);
326                            }
327                        }
328                    }
329                } else {
330                    error!("fail load index schema, err={:?}", res.result_code);
331                }
332            },
333            Err(e) => match e {
334                XError::Xapian(code) => {
335                    error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
336                },
337                XError::Io(e) => {
338                    error!("fail load index schema, err={:?}", e);
339                },
340            },
341        }
342
343        info!("load index schema, size={}", self.index_schema.len());
344    }
345
346    fn reopen_dbs(&mut self) -> Result<()> {
347        for (_, el) in self.using_dbqp.iter_mut() {
348            el.db.reopen()?;
349            el.qp.set_database(&mut el.db)?;
350        }
351
352        for (_, db) in self.opened_db.iter_mut() {
353            db.reopen()?;
354        }
355
356        Ok(())
357    }
358
359    fn _close_dbs(&mut self) -> Result<()> {
360        for (_, el) in self.using_dbqp.iter_mut() {
361            el.db.close()?;
362        }
363
364        for (_, db) in self.opened_db.iter_mut() {
365            db.close()?;
366        }
367
368        Ok(())
369    }
370
371    fn open_db_if_need(&mut self, db_name: &str) -> Result<()> {
372        if !self.opened_db.contains_key(db_name) {
373            if let Some(path) = self.db2path.get(db_name) {
374                let db = Database::new_with_path(&("./".to_owned() + path), UNKNOWN)?;
375                self.opened_db.insert(db_name.to_owned(), db);
376            } else {
377                return Err(XError::from(Error::new(ErrorKind::Other, "db2path invalid")));
378            }
379        }
380        Ok(())
381    }
382
383    fn open_dbqp_if_need(&mut self, db_names: &[String]) -> Result<()> {
384        if !self.using_dbqp.contains_key(db_names) {
385            for el in db_names {
386                self.open_db_if_need(el)?;
387            }
388
389            let mut dbqp = DatabaseQueryParser {
390                db: Database::new()?,
391                qp: QueryParser::new()?,
392            };
393
394            for el in db_names {
395                self.open_db_if_need(el)?;
396                dbqp.add_database(el, &mut self.opened_db)?;
397            }
398
399            dbqp.qp.set_max_wildcard_expansion(MAX_WILDCARD_EXPANSION)?;
400
401            self.xapian_stemmer = Stem::new(&self.xapian_lang)?;
402
403            dbqp.qp.set_stemmer(&mut self.xapian_stemmer)?;
404
405            dbqp.qp.set_database(&mut dbqp.db)?;
406
407            self.using_dbqp.insert(db_names.to_vec(), dbqp);
408        }
409        /*
410           committed_op_id = get_info().committed_op_id;
411        */
412        Ok(())
413    }
414
415    fn get_dn_names(&self, tta: &TTA, db_names_str: &str) -> Vec<String> {
416        let mut db_names = vec![];
417
418        if db_names_str.is_empty() {
419            let mut databases = HashMap::new();
420            self.db_names_from_tta(tta, &mut databases);
421
422            for (key, value) in databases.iter() {
423                if !(*value) {
424                    if key != "not-indexed" {
425                        db_names.push(key.to_owned());
426                    }
427
428                    // при автоопределении баз, если находится база deleted, то другие базы исключаются
429                    if key == "deleted" {
430                        db_names.clear();
431                        db_names.push(key.to_owned());
432                        break;
433                    }
434                }
435            }
436        } else {
437            for el in db_names_str.split(',') {
438                db_names.push(String::from(el).trim().to_owned());
439            }
440        }
441
442        if db_names.is_empty() {
443            db_names.push("base".to_owned());
444        }
445
446        db_names
447    }
448
449    fn db_names_from_tta(&self, tta: &TTA, db_names: &mut HashMap<String, bool>) -> String {
450        let mut ll = String::default();
451        let mut rr = String::default();
452
453        if let Some(l) = &tta.l {
454            ll = self.db_names_from_tta(l, db_names)
455        };
456
457        if let Some(r) = &tta.r {
458            rr = self.db_names_from_tta(r, db_names);
459        }
460
461        if !ll.is_empty() && !rr.is_empty() {
462            if ll == "rdf:type" {
463                let dbn = self.index_schema.get_dbname_of_class(&rr);
464                db_names.insert(dbn.to_owned(), false);
465            } else if ll == "v-s:deleted" {
466                db_names.insert("deleted".to_owned(), false);
467            }
468        }
469
470        tta.op.to_owned()
471    }
472}