1use v_authorization_lmdb_impl::AzContext;
2use crate::ft_xapian::index_schema::IndexerSchema;
3use crate::ft_xapian::init_db_path;
4use crate::ft_xapian::key2slot::Key2Slot;
5use crate::ft_xapian::vql::TTA;
6use crate::ft_xapian::xapian_vql::{exec_xapian_query_and_queue_authorize, get_sorter, transform_vql_to_xapian, AuxContext};
7use crate::module::common::load_onto;
8use crate::module::info::ModuleInfo;
9use v_individual_model::onto::individual::Individual;
10use v_individual_model::onto::onto_impl::Onto;
11use v_individual_model::onto::onto_index::OntoIndex;
12use crate::search::common::{FTQuery, QueryResult};
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use crate::v_api::common_type::OptAuthorize;
17use crate::v_authorization::common::AuthorizationContext;
18use v_authorization::common::Access;
19use futures::executor::block_on;
20use std::collections::HashMap;
21use std::io::{Error, ErrorKind};
22use std::time::Instant;
23use std::time::SystemTime;
24use xapian_rusty::*;
25
26const MAX_WILDCARD_EXPANSION: i32 = 20_000;
27const BASE_PATH: &str = "./data";
28
29pub struct DatabaseQueryParser {
30 db: Database,
31 qp: QueryParser,
32}
33
34impl DatabaseQueryParser {
35 fn add_database(&mut self, db_name: &str, opened_db: &mut HashMap<String, Database>) -> Result<()> {
36 if let Some(add_db) = opened_db.get_mut(db_name) {
37 self.db.add_database(add_db)?;
38 }
39 Ok(())
40 }
41}
42
43pub struct XapianReader {
44 pub index_schema: IndexerSchema,
45 pub onto: Onto,
46 pub onto_modified: SystemTime,
47 using_dbqp: HashMap<Vec<String>, DatabaseQueryParser>,
48 opened_db: HashMap<String, Database>,
49 xapian_stemmer: Stem,
50 xapian_lang: String,
51 mdif: ModuleInfo,
52 key2slot: Key2Slot,
53 db2path: HashMap<String, String>,
54 committed_op_id: i64,
55 az: AzContext,
56}
57
58impl XapianReader {
59 pub fn new(lang: &str, storage: &mut VStorage) -> Option<Self> {
60 let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
61 if indexer_module_info.is_err() {
62 error!("{:?}", indexer_module_info.err());
63 return None;
64 }
65
66 let mut onto = Onto::default();
67 load_onto(storage, &mut onto);
68
69 let mut xr = XapianReader {
70 using_dbqp: Default::default(),
71 opened_db: Default::default(),
72 xapian_stemmer: Stem::new(lang).unwrap(),
73 xapian_lang: lang.to_string(),
74 index_schema: Default::default(),
75 mdif: indexer_module_info.unwrap(),
76 key2slot: Key2Slot::load().unwrap_or_default(),
77 onto,
78 db2path: init_db_path(),
79 committed_op_id: 0,
80 onto_modified: SystemTime::now(),
81 az: AzContext::default(),
82 };
83
84 xr.load_index_schema(storage);
85
86 Some(xr)
87 }
88
89 pub fn new_without_init(lang: &str) -> Option<Self> {
90 let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
91 if indexer_module_info.is_err() {
92 error!("{:?}", indexer_module_info.err());
93 return None;
94 }
95
96 let xr = XapianReader {
97 using_dbqp: Default::default(),
98 opened_db: Default::default(),
99 xapian_stemmer: Stem::new(lang).unwrap(),
100 xapian_lang: lang.to_string(),
101 index_schema: Default::default(),
102 mdif: indexer_module_info.unwrap(),
103 key2slot: Key2Slot::load().unwrap_or_default(),
104 onto: Onto::default(),
105 db2path: init_db_path(),
106 committed_op_id: 0,
107 onto_modified: SystemTime::UNIX_EPOCH,
108 az: AzContext::default(),
109 };
110
111 Some(xr)
112 }
113
114 pub fn query_use_authorize(&mut self, request: FTQuery, storage: &mut VStorage, op_auth: OptAuthorize, reopen: bool) -> QueryResult {
115 if reopen {
116 if let Err(e) = self.reopen_dbs() {
117 error!("fail reopen xapian databases: {:?}", e);
118 }
119 }
120
121 let mut res_out_list = vec![];
122 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
123 ctx.push(id.to_owned());
124 }
125
126 if let Some(t) = OntoIndex::get_modified() {
127 if t > self.onto_modified {
128 load_onto(storage, &mut self.onto);
129 self.onto_modified = t;
130 }
131 }
132 if self.index_schema.is_empty() {
133 self.load_index_schema(storage);
134 }
135
136 if let Ok(mut res) = block_on(self.query_use_collect_fn(&request, add_out_element, op_auth, &mut res_out_list)) {
137 res.result = res_out_list;
138 debug!("res={:?}", res);
139 return res;
140 }
141 QueryResult::default()
142 }
143
144 pub fn query(&mut self, request: FTQuery, storage: &mut VStorage) -> QueryResult {
145 self.query_use_authorize(request, storage, OptAuthorize::YES, false)
146 }
147
148 pub async fn query_use_collect_fn<T>(
149 &mut self,
150 request: &FTQuery,
151 add_out_element: fn(uri: &str, ctx: &mut T),
152 op_auth: OptAuthorize,
153 out_list: &mut T,
154 ) -> Result<QueryResult> {
155 let mut az = std::mem::take(&mut self.az);
156 let result = self
157 .query_use_collect_with_auth_fn(
158 request,
159 add_out_element,
160 op_auth,
161 out_list,
162 &mut |pairs: &[(&str, &str)]| {
163 pairs
164 .iter()
165 .map(|(subject_id, user_id)| match az.authorize(subject_id, user_id, Access::CanRead as u8, true) {
166 Ok(result) => result == Access::CanRead as u8,
167 Err(e) => {
168 error!("Ошибка при проверке авторизации: subject_id=[{}], user_id=[{}], error=[{:?}]", subject_id, user_id, e);
169 false
170 },
171 })
172 .collect()
173 },
174 1,
175 )
176 .await;
177 self.az = az;
178 result
179 }
180
181 pub async fn query_use_collect_with_auth_fn<T, F>(
187 &mut self,
188 request: &FTQuery,
189 add_out_element: fn(uri: &str, ctx: &mut T),
190 op_auth: OptAuthorize,
191 out_list: &mut T,
192 authorize: &mut F,
193 batch_size: usize,
194 ) -> Result<QueryResult>
195 where
196 F: FnMut(&[(&str, &str)]) -> Vec<bool>,
197 {
198 let total_time = Instant::now();
199 let mut sr = QueryResult::default();
200
201 let wtta = TTA::parse_expr(&request.query);
202
203 if wtta.is_none() {
204 error!("fail parse query (phase 1) [{}], tta is empty", request.query);
205 sr.result_code = ResultCode::BadRequest;
206 return Ok(sr);
207 }
208
209 if self.key2slot.is_need_reload()? {
210 self.key2slot = Key2Slot::load()?;
211 }
212
213 let mut tta = wtta.unwrap();
214
215 let db_names = self.get_dn_names(&tta, &request.databases);
216
217 debug!("db_names={:?}", db_names);
218 debug!(
219 "user_uri=[{}] query=[{}] str_sort=[{}], db_names=[{:?}], from=[{}], top=[{}], limit=[{}]",
220 request.user, request.query, request.sort, request.databases, request.from, request.top, request.limit
221 );
222 debug!("TTA [{}]", tta);
223
224 if let Some((_, new_committed_op_id)) = self.mdif.read_info() {
225 if new_committed_op_id > self.committed_op_id {
226 info!("search:reopen_db: new committed_op_id={} > prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
227 self.reopen_dbs()?;
228 self.committed_op_id = new_committed_op_id;
229 } else {
230 debug!("search:check reopen_db: new committed_op_id={}, prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
231 }
232 }
233
234 self.open_dbqp_if_need(&db_names)?;
235
236 let mut query = Query::new()?;
237 if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
238 let mut _rd: f64 = 0.0;
239 let mut ctx = AuxContext {
240 key2slot: &self.key2slot,
241 qp: &mut dbqp.qp,
242 onto: &self.onto,
243 };
244 transform_vql_to_xapian(&mut ctx, &mut tta, None, None, &mut query, &mut _rd, 0)?;
245 }
246
247 debug!("query={:?}", query.get_description());
248
249 if query.is_empty() {
250 sr.result_code = ResultCode::Ok;
251 warn!("query is empty [{}]", request.query);
252 return Ok(sr);
253 }
254
255 if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
256 let mut xapian_enquire = dbqp.db.new_enquire()?;
257
258 xapian_enquire.set_query(&mut query)?;
259
260 if let Some(s) = get_sorter(&request.sort, &self.key2slot)? {
261 xapian_enquire.set_sort_by_key(s, true)?;
262 }
263
264 sr = exec_xapian_query_and_queue_authorize(request, &mut xapian_enquire, add_out_element, op_auth, out_list, authorize, batch_size).await;
265 }
266
267 debug!("res={:?}", sr);
268 sr.total_time = total_time.elapsed().as_millis() as i64;
269 sr.query_time = sr.total_time - sr.authorize_time;
270
271 Ok(sr)
272 }
273
274 pub fn load_index_schema(&mut self, storage: &mut VStorage) {
275 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
276 ctx.push(id.to_owned());
277 }
278 let mut ctx: Vec<String> = vec![];
279
280 match block_on(self.query_use_collect_fn(
281 &FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"),
282 add_out_element,
283 OptAuthorize::NO,
284 &mut ctx,
285 )) {
286 Ok(res) => {
287 if res.result_code == ResultCode::Ok && res.count > 0 {
288 for id in ctx.iter() {
289 let indv = &mut Individual::default();
290 if storage.get_individual(id, indv).is_ok() {
291 self.index_schema.add_schema_data(&self.onto, indv);
292 }
293 }
294 } else {
295 error!("fail load index schema, err={:?}", res.result_code);
296 }
297 },
298 Err(e) => match e {
299 XError::Xapian(code) => {
300 error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
301 },
302 XError::Io(e) => {
303 error!("fail load index schema, err={:?}", e);
304 },
305 },
306 }
307 }
308
309 pub async fn c_load_index_schema(&mut self, storage: &AStorage) {
310 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
311 ctx.push(id.to_owned());
312 }
313 let mut ctx: Vec<String> = vec![];
314
315 info!("start load index schema");
316 match self
317 .query_use_collect_fn(&FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"), add_out_element, OptAuthorize::NO, &mut ctx)
318 .await
319 {
320 Ok(res) => {
321 if res.result_code == ResultCode::Ok && res.count > 0 {
322 for id in ctx.iter() {
323 if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
324 if res == ResultCode::Ok {
325 self.index_schema.add_schema_data(&self.onto, &mut indv);
326 }
327 }
328 }
329 } else {
330 error!("fail load index schema, err={:?}", res.result_code);
331 }
332 },
333 Err(e) => match e {
334 XError::Xapian(code) => {
335 error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
336 },
337 XError::Io(e) => {
338 error!("fail load index schema, err={:?}", e);
339 },
340 },
341 }
342
343 info!("load index schema, size={}", self.index_schema.len());
344 }
345
346 fn reopen_dbs(&mut self) -> Result<()> {
347 for (_, el) in self.using_dbqp.iter_mut() {
348 el.db.reopen()?;
349 el.qp.set_database(&mut el.db)?;
350 }
351
352 for (_, db) in self.opened_db.iter_mut() {
353 db.reopen()?;
354 }
355
356 Ok(())
357 }
358
359 fn _close_dbs(&mut self) -> Result<()> {
360 for (_, el) in self.using_dbqp.iter_mut() {
361 el.db.close()?;
362 }
363
364 for (_, db) in self.opened_db.iter_mut() {
365 db.close()?;
366 }
367
368 Ok(())
369 }
370
371 fn open_db_if_need(&mut self, db_name: &str) -> Result<()> {
372 if !self.opened_db.contains_key(db_name) {
373 if let Some(path) = self.db2path.get(db_name) {
374 let db = Database::new_with_path(&("./".to_owned() + path), UNKNOWN)?;
375 self.opened_db.insert(db_name.to_owned(), db);
376 } else {
377 return Err(XError::from(Error::new(ErrorKind::Other, "db2path invalid")));
378 }
379 }
380 Ok(())
381 }
382
383 fn open_dbqp_if_need(&mut self, db_names: &[String]) -> Result<()> {
384 if !self.using_dbqp.contains_key(db_names) {
385 for el in db_names {
386 self.open_db_if_need(el)?;
387 }
388
389 let mut dbqp = DatabaseQueryParser {
390 db: Database::new()?,
391 qp: QueryParser::new()?,
392 };
393
394 for el in db_names {
395 self.open_db_if_need(el)?;
396 dbqp.add_database(el, &mut self.opened_db)?;
397 }
398
399 dbqp.qp.set_max_wildcard_expansion(MAX_WILDCARD_EXPANSION)?;
400
401 self.xapian_stemmer = Stem::new(&self.xapian_lang)?;
402
403 dbqp.qp.set_stemmer(&mut self.xapian_stemmer)?;
404
405 dbqp.qp.set_database(&mut dbqp.db)?;
406
407 self.using_dbqp.insert(db_names.to_vec(), dbqp);
408 }
409 Ok(())
413 }
414
415 fn get_dn_names(&self, tta: &TTA, db_names_str: &str) -> Vec<String> {
416 let mut db_names = vec![];
417
418 if db_names_str.is_empty() {
419 let mut databases = HashMap::new();
420 self.db_names_from_tta(tta, &mut databases);
421
422 for (key, value) in databases.iter() {
423 if !(*value) {
424 if key != "not-indexed" {
425 db_names.push(key.to_owned());
426 }
427
428 if key == "deleted" {
430 db_names.clear();
431 db_names.push(key.to_owned());
432 break;
433 }
434 }
435 }
436 } else {
437 for el in db_names_str.split(',') {
438 db_names.push(String::from(el).trim().to_owned());
439 }
440 }
441
442 if db_names.is_empty() {
443 db_names.push("base".to_owned());
444 }
445
446 db_names
447 }
448
449 fn db_names_from_tta(&self, tta: &TTA, db_names: &mut HashMap<String, bool>) -> String {
450 let mut ll = String::default();
451 let mut rr = String::default();
452
453 if let Some(l) = &tta.l {
454 ll = self.db_names_from_tta(l, db_names)
455 };
456
457 if let Some(r) = &tta.r {
458 rr = self.db_names_from_tta(r, db_names);
459 }
460
461 if !ll.is_empty() && !rr.is_empty() {
462 if ll == "rdf:type" {
463 let dbn = self.index_schema.get_dbname_of_class(&rr);
464 db_names.insert(dbn.to_owned(), false);
465 } else if ll == "v-s:deleted" {
466 db_names.insert("deleted".to_owned(), false);
467 }
468 }
469
470 tta.op.to_owned()
471 }
472}