1use v_authorization_impl::AzContext;
2use crate::ft_xapian::index_schema::IndexerSchema;
3use crate::ft_xapian::init_db_path;
4use crate::ft_xapian::key2slot::Key2Slot;
5use crate::ft_xapian::vql::TTA;
6use crate::ft_xapian::xapian_vql::{exec_xapian_query_and_queue_authorize, get_sorter, transform_vql_to_xapian, AuxContext};
7use crate::module::common::load_onto;
8use crate::module::info::ModuleInfo;
9use v_individual_model::onto::individual::Individual;
10use v_individual_model::onto::onto_impl::Onto;
11use v_individual_model::onto::onto_index::OntoIndex;
12use crate::search::common::{FTQuery, QueryResult};
13use crate::storage::async_storage::{get_individual_from_db, AStorage};
14use v_storage::VStorage;
15use crate::v_api::common_type::ResultCode;
16use crate::v_api::common_type::OptAuthorize;
17use futures::executor::block_on;
18use std::collections::HashMap;
19use std::io::{Error, ErrorKind};
20use std::time::Instant;
21use std::time::SystemTime;
22use xapian_rusty::*;
23
24const MAX_WILDCARD_EXPANSION: i32 = 20_000;
25const BASE_PATH: &str = "./data";
26
27pub struct DatabaseQueryParser {
28 db: Database,
29 qp: QueryParser,
30}
31
32impl DatabaseQueryParser {
33 fn add_database(&mut self, db_name: &str, opened_db: &mut HashMap<String, Database>) -> Result<()> {
34 if let Some(add_db) = opened_db.get_mut(db_name) {
35 self.db.add_database(add_db)?;
36 }
37 Ok(())
38 }
39}
40
41pub struct XapianReader {
42 pub index_schema: IndexerSchema,
43 pub onto: Onto,
44 pub onto_modified: SystemTime,
45 using_dbqp: HashMap<Vec<String>, DatabaseQueryParser>,
46 opened_db: HashMap<String, Database>,
47 xapian_stemmer: Stem,
48 xapian_lang: String,
49 mdif: ModuleInfo,
50 key2slot: Key2Slot,
51 db2path: HashMap<String, String>,
52 committed_op_id: i64,
53 az: AzContext,
54}
55
56impl XapianReader {
57 pub fn new(lang: &str, storage: &mut VStorage) -> Option<Self> {
58 let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
59 if indexer_module_info.is_err() {
60 error!("{:?}", indexer_module_info.err());
61 return None;
62 }
63
64 let mut onto = Onto::default();
65 load_onto(storage, &mut onto);
66
67 let mut xr = XapianReader {
68 using_dbqp: Default::default(),
69 opened_db: Default::default(),
70 xapian_stemmer: Stem::new(lang).unwrap(),
71 xapian_lang: lang.to_string(),
72 index_schema: Default::default(),
73 mdif: indexer_module_info.unwrap(),
74 key2slot: Key2Slot::load().unwrap_or_default(),
75 onto,
76 db2path: init_db_path(),
77 committed_op_id: 0,
78 onto_modified: SystemTime::now(),
79 az: AzContext::default(),
80 };
81
82 xr.load_index_schema(storage);
83
84 Some(xr)
85 }
86
87 pub fn new_without_init(lang: &str) -> Option<Self> {
88 let indexer_module_info = ModuleInfo::new(BASE_PATH, "fulltext_indexer", true);
89 if indexer_module_info.is_err() {
90 error!("{:?}", indexer_module_info.err());
91 return None;
92 }
93
94 let xr = XapianReader {
95 using_dbqp: Default::default(),
96 opened_db: Default::default(),
97 xapian_stemmer: Stem::new(lang).unwrap(),
98 xapian_lang: lang.to_string(),
99 index_schema: Default::default(),
100 mdif: indexer_module_info.unwrap(),
101 key2slot: Key2Slot::load().unwrap_or_default(),
102 onto: Onto::default(),
103 db2path: init_db_path(),
104 committed_op_id: 0,
105 onto_modified: SystemTime::UNIX_EPOCH,
106 az: AzContext::default(),
107 };
108
109 Some(xr)
110 }
111
112 pub fn query_use_authorize(&mut self, request: FTQuery, storage: &mut VStorage, op_auth: OptAuthorize, reopen: bool) -> QueryResult {
113 if reopen {
114 if let Err(e) = self.reopen_dbs() {
115 error!("fail reopen xapian databases: {:?}", e);
116 }
117 }
118
119 let mut res_out_list = vec![];
120 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
121 ctx.push(id.to_owned());
122 }
123
124 if let Some(t) = OntoIndex::get_modified() {
125 if t > self.onto_modified {
126 load_onto(storage, &mut self.onto);
127 self.onto_modified = t;
128 }
129 }
130 if self.index_schema.is_empty() {
131 self.load_index_schema(storage);
132 }
133
134 if let Ok(mut res) = block_on(self.query_use_collect_fn(&request, add_out_element, op_auth, &mut res_out_list)) {
135 res.result = res_out_list;
136 debug!("res={:?}", res);
137 return res;
138 }
139 QueryResult::default()
140 }
141
142 pub fn query(&mut self, request: FTQuery, storage: &mut VStorage) -> QueryResult {
143 self.query_use_authorize(request, storage, OptAuthorize::YES, false)
144 }
145
146 pub async fn query_use_collect_fn<T>(
147 &mut self,
148 request: &FTQuery,
149 add_out_element: fn(uri: &str, ctx: &mut T),
150 op_auth: OptAuthorize,
151 out_list: &mut T,
152 ) -> Result<QueryResult> {
153 let total_time = Instant::now();
154 let mut sr = QueryResult::default();
155
156 let wtta = TTA::parse_expr(&request.query);
157
158 if wtta.is_none() {
159 error!("fail parse query (phase 1) [{}], tta is empty", request.query);
160 sr.result_code = ResultCode::BadRequest;
161 return Ok(sr);
162 }
163
164 if self.key2slot.is_need_reload()? {
165 self.key2slot = Key2Slot::load()?;
166 }
167
168 let mut tta = wtta.unwrap();
169
170 let db_names = self.get_dn_names(&tta, &request.databases);
171
172 debug!("db_names={:?}", db_names);
173 debug!(
174 "user_uri=[{}] query=[{}] str_sort=[{}], db_names=[{:?}], from=[{}], top=[{}], limit=[{}]",
175 request.user, request.query, request.sort, request.databases, request.from, request.top, request.limit
176 );
177 debug!("TTA [{}]", tta);
178
179 if let Some((_, new_committed_op_id)) = self.mdif.read_info() {
180 if new_committed_op_id > self.committed_op_id {
181 info!("search:reopen_db: new committed_op_id={} > prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
182 self.reopen_dbs()?;
183 self.committed_op_id = new_committed_op_id;
184 } else {
185 debug!("search:check reopen_db: new committed_op_id={}, prev committed_op_id={}", new_committed_op_id, self.committed_op_id);
186 }
187 }
188
189 self.open_dbqp_if_need(&db_names)?;
190
191 let mut query = Query::new()?;
192 if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
193 let mut _rd: f64 = 0.0;
194 let mut ctx = AuxContext {
195 key2slot: &self.key2slot,
196 qp: &mut dbqp.qp,
197 onto: &self.onto,
198 };
199 transform_vql_to_xapian(&mut ctx, &mut tta, None, None, &mut query, &mut _rd, 0)?;
200 }
201
202 debug!("query={:?}", query.get_description());
203
204 if query.is_empty() {
205 sr.result_code = ResultCode::Ok;
206 warn!("query is empty [{}]", request.query);
207 return Ok(sr);
208 }
209
210 if let Some(dbqp) = self.using_dbqp.get_mut(&db_names) {
211 let mut xapian_enquire = dbqp.db.new_enquire()?;
212
213 xapian_enquire.set_query(&mut query)?;
214
215 if let Some(s) = get_sorter(&request.sort, &self.key2slot)? {
216 xapian_enquire.set_sort_by_key(s, true)?;
217 }
218
219 sr = exec_xapian_query_and_queue_authorize(request, &mut xapian_enquire, add_out_element, op_auth, out_list, &mut self.az).await;
220 }
221
222 debug!("res={:?}", sr);
223 sr.total_time = total_time.elapsed().as_millis() as i64;
224 sr.query_time = sr.total_time - sr.authorize_time;
225
226 Ok(sr)
227 }
228
229 pub fn load_index_schema(&mut self, storage: &mut VStorage) {
230 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
231 ctx.push(id.to_owned());
232 }
233 let mut ctx: Vec<String> = vec![];
234
235 match block_on(self.query_use_collect_fn(
236 &FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"),
237 add_out_element,
238 OptAuthorize::NO,
239 &mut ctx,
240 )) {
241 Ok(res) => {
242 if res.result_code == ResultCode::Ok && res.count > 0 {
243 for id in ctx.iter() {
244 let indv = &mut Individual::default();
245 if storage.get_individual(id, indv).is_ok() {
246 self.index_schema.add_schema_data(&self.onto, indv);
247 }
248 }
249 } else {
250 error!("fail load index schema, err={:?}", res.result_code);
251 }
252 },
253 Err(e) => match e {
254 XError::Xapian(code) => {
255 error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
256 },
257 XError::Io(e) => {
258 error!("fail load index schema, err={:?}", e);
259 },
260 },
261 }
262 }
263
264 pub async fn c_load_index_schema(&mut self, storage: &AStorage) {
265 fn add_out_element(id: &str, ctx: &mut Vec<String>) {
266 ctx.push(id.to_owned());
267 }
268 let mut ctx: Vec<String> = vec![];
269
270 info!("start load index schema");
271 match self
272 .query_use_collect_fn(&FTQuery::new_with_user("cfg:VedaSystem", "'rdf:type' === 'vdi:ClassIndex'"), add_out_element, OptAuthorize::NO, &mut ctx)
273 .await
274 {
275 Ok(res) => {
276 if res.result_code == ResultCode::Ok && res.count > 0 {
277 for id in ctx.iter() {
278 if let Ok((mut indv, res)) = get_individual_from_db(id, "", storage, None).await {
279 if res == ResultCode::Ok {
280 self.index_schema.add_schema_data(&self.onto, &mut indv);
281 }
282 }
283 }
284 } else {
285 error!("fail load index schema, err={:?}", res.result_code);
286 }
287 },
288 Err(e) => match e {
289 XError::Xapian(code) => {
290 error!("fail load index schema, err={} ({})", get_xapian_err_type(code), code);
291 },
292 XError::Io(e) => {
293 error!("fail load index schema, err={:?}", e);
294 },
295 },
296 }
297
298 info!("load index schema, size={}", self.index_schema.len());
299 }
300
301 fn reopen_dbs(&mut self) -> Result<()> {
302 for (_, el) in self.using_dbqp.iter_mut() {
303 el.db.reopen()?;
304 el.qp.set_database(&mut el.db)?;
305 }
306
307 for (_, db) in self.opened_db.iter_mut() {
308 db.reopen()?;
309 }
310
311 Ok(())
312 }
313
314 fn _close_dbs(&mut self) -> Result<()> {
315 for (_, el) in self.using_dbqp.iter_mut() {
316 el.db.close()?;
317 }
318
319 for (_, db) in self.opened_db.iter_mut() {
320 db.close()?;
321 }
322
323 Ok(())
324 }
325
326 fn open_db_if_need(&mut self, db_name: &str) -> Result<()> {
327 if !self.opened_db.contains_key(db_name) {
328 if let Some(path) = self.db2path.get(db_name) {
329 let db = Database::new_with_path(&("./".to_owned() + path), UNKNOWN)?;
330 self.opened_db.insert(db_name.to_owned(), db);
331 } else {
332 return Err(XError::from(Error::new(ErrorKind::Other, "db2path invalid")));
333 }
334 }
335 Ok(())
336 }
337
338 fn open_dbqp_if_need(&mut self, db_names: &[String]) -> Result<()> {
339 if !self.using_dbqp.contains_key(db_names) {
340 for el in db_names {
341 self.open_db_if_need(el)?;
342 }
343
344 let mut dbqp = DatabaseQueryParser {
345 db: Database::new()?,
346 qp: QueryParser::new()?,
347 };
348
349 for el in db_names {
350 self.open_db_if_need(el)?;
351 dbqp.add_database(el, &mut self.opened_db)?;
352 }
353
354 dbqp.qp.set_max_wildcard_expansion(MAX_WILDCARD_EXPANSION)?;
355
356 self.xapian_stemmer = Stem::new(&self.xapian_lang)?;
357
358 dbqp.qp.set_stemmer(&mut self.xapian_stemmer)?;
359
360 dbqp.qp.set_database(&mut dbqp.db)?;
361
362 self.using_dbqp.insert(db_names.to_vec(), dbqp);
363 }
364 Ok(())
368 }
369
370 fn get_dn_names(&self, tta: &TTA, db_names_str: &str) -> Vec<String> {
371 let mut db_names = vec![];
372
373 if db_names_str.is_empty() {
374 let mut databases = HashMap::new();
375 self.db_names_from_tta(tta, &mut databases);
376
377 for (key, value) in databases.iter() {
378 if !(*value) {
379 if key != "not-indexed" {
380 db_names.push(key.to_owned());
381 }
382
383 if key == "deleted" {
385 db_names.clear();
386 db_names.push(key.to_owned());
387 break;
388 }
389 }
390 }
391 } else {
392 for el in db_names_str.split(',') {
393 db_names.push(String::from(el).trim().to_owned());
394 }
395 }
396
397 if db_names.is_empty() {
398 db_names.push("base".to_owned());
399 }
400
401 db_names
402 }
403
404 fn db_names_from_tta(&self, tta: &TTA, db_names: &mut HashMap<String, bool>) -> String {
405 let mut ll = String::default();
406 let mut rr = String::default();
407
408 if let Some(l) = &tta.l {
409 ll = self.db_names_from_tta(l, db_names)
410 };
411
412 if let Some(r) = &tta.r {
413 rr = self.db_names_from_tta(r, db_names);
414 }
415
416 if !ll.is_empty() && !rr.is_empty() {
417 if ll == "rdf:type" {
418 let dbn = self.index_schema.get_dbname_of_class(&rr);
419 db_names.insert(dbn.to_owned(), false);
420 } else if ll == "v-s:deleted" {
421 db_names.insert("deleted".to_owned(), false);
422 }
423 }
424
425 tta.op.to_owned()
426 }
427}