1use crate::{add_field_to_doc_json, build_schema_for_fields, get_term};
6use anyhow::bail;
7use bytes::{BufMut, Bytes, BytesMut};
8use ordinary_config::{ContentDefinition, ContentLimits};
9use ordinary_types::{ContentObject, Kind, json_to_flexbuffer_vec};
10use rustc_hash::{FxHashMap, FxHashSet};
11use saferlmdb::{
12 self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
13};
14
15use std::path::Path;
16use std::str::FromStr;
17use std::sync::Arc;
18use tantivy::collector::TopDocs;
19use tantivy::directory::MmapDirectory;
20use tantivy::query::FuzzyTermQuery;
21use tantivy::schema::{Schema, Value};
22use tantivy::{Index, IndexWriter, TantivyDocument};
23use tracing::instrument;
24use uuid::Uuid;
25
26pub struct ContentStore {
27 pub limits: ContentLimits,
28 env: Arc<Environment>,
29
30 content_db: Arc<Database<'static>>,
36
37 content_defs: Vec<ContentDefinition>,
38 content_def_map: FxHashMap<String, ContentDefinition>,
39
40 content_search_indexes: Vec<Option<(Schema, Index, tantivy::IndexReader)>>,
41
42 log_size: bool,
43}
44
45impl ContentStore {
46 pub fn new(
47 limits: ContentLimits,
48 mut content_defs: Vec<ContentDefinition>,
49 env: &Arc<Environment>,
50 search_dir: impl AsRef<Path>,
51 log_size: bool,
52 ) -> anyhow::Result<Self> {
53 let content_db = Arc::new(Database::open(
54 env.clone(),
55 Some("content"),
56 &DatabaseOptions::new(lmdb::db::Flags::CREATE),
57 )?);
58
59 content_defs.sort_by_key(|a| a.idx);
60 for def in &mut content_defs {
61 def.fields.sort_by_key(|a| a.idx);
62 }
63
64 let mut content_def_map = FxHashMap::default();
65 let mut content_search_indexes = vec![];
66
67 for def in &content_defs {
68 content_def_map.insert(def.name.clone(), def.clone());
69
70 if limits.search_enabled {
71 let mut schema_builder = Schema::builder();
72
73 let should_build = build_schema_for_fields(&mut schema_builder, &def.fields, false);
74
75 if should_build {
76 let schema = schema_builder.build();
77
78 let index_dir = search_dir
79 .as_ref()
80 .join("content")
81 .join(def.idx.to_string());
82 std::fs::create_dir_all(&index_dir)?;
83
84 let dir = MmapDirectory::open(index_dir)?;
85 let index = Index::open_or_create(dir, schema.clone())?;
86
87 let reader = index
88 .reader_builder()
89 .reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
90
91 content_search_indexes.push(Some((schema, index, reader.try_into()?)));
92 } else {
93 content_search_indexes.push(None);
94 drop(schema_builder);
95 }
96 }
97 }
98
99 Ok(Self {
100 limits,
101 env: env.clone(),
102 content_db,
103
104 content_defs,
105 content_def_map,
106
107 content_search_indexes,
108
109 log_size,
110 })
111 }
112
113 #[allow(clippy::too_many_lines)]
114 #[instrument(skip_all, err)]
115 pub fn update(&self, objects: &Vec<ContentObject>) -> anyhow::Result<()> {
116 let mut size = 0;
119 let mut inserts = 0;
120
121 let txn = WriteTransaction::new(self.env.clone())?;
122
123 {
124 let mut access = txn.access();
125 let mut object_cursor = txn.cursor(self.content_db.clone())?;
126
127 let mut del_keys = vec![];
128
129 loop {
130 if let Ok((k, _)) = object_cursor.get_current::<[u8], [u8]>(&access) {
131 del_keys.push(Bytes::copy_from_slice(k));
132 }
133
134 if object_cursor.next::<[u8], [u8]>(&access).is_err() {
135 break;
136 }
137 }
138
139 for del_key in del_keys {
140 access.del_key(&self.content_db, &del_key[..])?;
141 }
142
143 let mut seen_instances = FxHashSet::default();
144
145 for object in objects {
146 let mut keys = FxHashSet::default();
147 let mut real_fields_map = FxHashMap::default();
148
149 for field in &object.fields {
150 real_fields_map.insert(field.name.clone(), &field.value);
151 }
152
153 let mut field_name_map = FxHashMap::default();
154
155 let mut search = None;
156
157 if let Some(content_def) = self.content_def_map.get(&object.instance_of) {
158 if let Some(searchable) =
159 self.content_search_indexes.get(content_def.idx as usize)
160 && let Some((schema, index, _reader)) = searchable
161 {
162 let writer: IndexWriter = index.writer(15_000_000)?;
164
165 if !seen_instances.contains(&object.instance_of) {
166 writer.delete_all_documents()?;
167 seen_instances.insert(object.instance_of.clone());
168 }
169
170 if self.limits.search_enabled {
171 search = Some((TantivyDocument::default(), schema, index, writer));
172 } else {
173 tracing::warn!(
174 i = content_def.idx,
175 nm = content_def.name,
176 "search disabled"
177 );
178 }
179 }
180
181 let mut first_indexed = true;
182
183 for field in &content_def.fields {
184 field_name_map.insert(
185 &field.name,
186 (field.idx, field.kind.clone(), field.searchable),
187 );
188
189 if field.indexed == Some(true)
190 && let Some(real_field_value) = real_fields_map.get(&field.name)
191 {
192 match field.kind {
193 Kind::String | Kind::Url => {
194 if let Some(value) = real_field_value.as_str() {
195 let mut key = BytesMut::new();
196 key.put_u8(content_def.idx);
197 key.put_u8(field.idx);
198 key.put(value.as_bytes());
199
200 if keys.contains(&key) {
201 bail!(
202 "duplicate index value '{}' for definition '{}' on field '{}'",
203 value,
204 content_def.name,
205 field.name
206 );
207 }
208
209 if first_indexed {
210 if let Some((doc, schema, _index, _writer)) =
211 &mut search
212 {
213 let tantivy_field =
214 schema.get_field(&field.idx.to_string())?;
215 doc.add_bytes(tantivy_field, &key);
216 }
217
218 first_indexed = false;
219 }
220
221 keys.insert(key);
222 }
223 }
224 Kind::Uuid => {
225 if let Some(value) = real_field_value.as_str() {
226 let uuid = Uuid::from_str(value)?;
227
228 let mut key = BytesMut::new();
229 key.put_u8(content_def.idx);
230 key.put_u8(field.idx);
231 key.put(uuid.as_ref());
232
233 if keys.contains(&key) {
234 bail!(
235 "duplicate index value '{}' for definition '{}' on field '{}'",
236 value,
237 content_def.name,
238 field.name
239 );
240 }
241
242 if first_indexed {
243 if let Some((doc, schema, _index, _writer)) =
244 &mut search
245 {
246 let tantivy_field =
247 schema.get_field(&field.idx.to_string())?;
248 doc.add_bytes(tantivy_field, &key);
249 }
250
251 first_indexed = false;
252 }
253
254 keys.insert(key);
255 }
256 }
257 _ => {
258 bail!("cannot index on type non-uuid/string/url types");
259 }
260 }
261 }
262 }
263 }
264
265 let mut sorted_fields = object.fields.clone();
266
267 sorted_fields.sort_by(|a, b| {
268 if let Some((idx1, _, _)) = field_name_map.get(&a.name) {
269 if let Some((idx2, _, _)) = field_name_map.get(&b.name) {
270 idx1.cmp(idx2)
271 } else {
272 0.cmp(&0)
273 }
274 } else {
275 0.cmp(&0)
276 }
277 });
278
279 let mut object_builder =
280 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
281 let mut object_vector = object_builder.start_vector();
282
283 let mut field_too_large = false;
284
285 for field in sorted_fields {
286 if let Some((field_idx, kind, searchable)) = field_name_map.get(&field.name) {
287 json_to_flexbuffer_vec(kind, &field.value, &mut object_vector)?;
288 let field_str = field.value.to_string();
289
290 if field_str.len() as u64 > self.limits.max_field_size {
291 field_too_large = true;
292
293 tracing::warn!(def = object.instance_of, object = ?object.fields, field = field_str, "field too large");
294 break;
295 }
296
297 if searchable == &Some(true)
298 && let Some((doc, schema, _index, _writer)) = &mut search
299 {
300 let tantivy_field = schema.get_field(&field_idx.to_string())?;
301 add_field_to_doc_json(doc, kind, tantivy_field, &field.value)?;
302 }
303 }
304 }
305
306 object_vector.end_vector();
307
308 if !field_too_large {
309 let mut object_too_large = false;
310 let object_bytes = object_builder.view();
311
312 for key in keys {
313 let object_size = (object_bytes.len() + key.len()) as u64;
314
315 if object_size > self.limits.max_object_size {
316 object_too_large = true;
317
318 tracing::warn!(def = object.instance_of, object = ?object.fields, "object exceeds size limit");
319 break;
320 }
321
322 if size + object_size > self.limits.max_store_size {
323 object_too_large = true;
324
325 tracing::warn!(def = object.instance_of, object = ?object.fields, "object store size limit exceeded");
326 break;
327 }
328
329 access.put(
330 &self.content_db,
331 key.as_ref(),
332 object_bytes,
333 &put::Flags::empty(),
334 )?;
335
336 inserts += 1;
337
338 if self.log_size {
339 size += object_size;
340 }
341 }
342
343 if !object_too_large && let Some((doc, _schema, _index, mut writer)) = search {
344 writer.add_document(doc)?;
345 writer.commit()?;
346 writer.wait_merging_threads()?;
347 }
348 }
349 }
350 }
351
352 if self.log_size {
353 tracing::info!(inserts, size = %bytesize::ByteSize(size).display().si_short());
354 } else {
355 tracing::info!(inserts);
356 }
357
358 txn.commit()?;
359
360 Ok(())
361 }
362
363 #[instrument(skip_all, fields(i, nm), err)]
364 pub fn get(&self, def_idx: u8, field_idx: u8, field_value: &[u8]) -> anyhow::Result<Bytes> {
365 if let Some(content_def) = self.content_defs.get(def_idx as usize) {
366 let curr_span = tracing::Span::current();
367 curr_span.record("i", content_def.idx);
368 curr_span.record("nm", tracing::field::display(&content_def.name));
369
370 if let Some(field) = content_def.fields.get(field_idx as usize) {
371 match field.kind {
372 Kind::String | Kind::Url => tracing::info!(
373 f = field.idx,
374 nm = %field.name,
375 v = %std::str::from_utf8(field_value)?
376 ),
377 Kind::Uuid => {
378 if field_value.len() == 16 {
379 let uuid: [u8; 16] = field_value[0..16].try_into()?;
380 tracing::info!(
381 f = field.idx,
382 nm = %field.name,
383 v = Uuid::from_bytes(uuid).to_string()
384 );
385 } else {
386 bail!("uuid is not 16 bytes");
387 }
388 }
389 _ => tracing::error!("kind {:?} not yet supported as indexable", field.kind),
390 }
391
392 let mut key = vec![def_idx, field_idx];
393 key.extend_from_slice(field_value);
394
395 let txn = ReadTransaction::new(self.env.clone())?;
396
397 let access = txn.access();
398 let result = access.get(&self.content_db, &key)?;
399
400 Ok(Bytes::copy_from_slice(result))
401 } else {
402 bail!("no field idx {field_idx} for content definition {def_idx}")
403 }
404 } else {
405 bail!("no content definition for idx {def_idx}")
406 }
407 }
408
409 #[instrument(skip_all, fields(i, nm), err)]
410 pub fn list(&self, def_idx: u8) -> anyhow::Result<Bytes> {
411 if let Some(content_def) = self.content_defs.get(def_idx as usize) {
412 let curr_span = tracing::Span::current();
413 curr_span.record("i", content_def.idx);
414 curr_span.record("nm", tracing::field::display(&content_def.name));
415
416 let mut count = 0;
417
418 let mut objects_builder =
419 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
420 let mut objects_vec = objects_builder.start_vector();
421
422 let txn = ReadTransaction::new(self.env.clone())?;
423 let access = txn.access();
424
425 let mut object_cursor = txn.cursor(self.content_db.clone())?;
426
427 let (k, v) = object_cursor.seek_range_k::<[u8], [u8]>(&access, &[def_idx])?;
428
429 count += 1;
430 objects_vec.push(flexbuffers::Blob(v));
431
432 let initial_field = k[1];
433
434 while let Ok((k, v)) = object_cursor.next::<[u8], [u8]>(&access) {
435 if k[1] != initial_field {
436 break;
437 }
438
439 if k[0] == def_idx {
440 count += 1;
441 objects_vec.push(flexbuffers::Blob(v));
442 } else {
443 break;
444 }
445 }
446
447 objects_vec.end_vector();
448 tracing::info!(ct = count);
449
450 return Ok(Bytes::copy_from_slice(objects_builder.view()));
451 }
452
453 bail!("no content definition for idx {def_idx}")
454 }
455
456 #[instrument(skip_all, fields(i, nm), err)]
457 pub fn search_content(
458 &self,
459 def_idx: u8,
460 field_idx: u8,
461 value: &[u8],
462 ) -> anyhow::Result<Bytes> {
463 if let Some(content_def) = self.content_defs.get(def_idx as usize) {
464 let curr_span = tracing::Span::current();
465 curr_span.record("i", content_def.idx);
466 curr_span.record("nm", tracing::field::display(&content_def.name));
467
468 if let Some(first_indexed_field) =
469 content_def.fields.iter().find(|v| v.indexed == Some(true))
470 {
471 if let Some(searchable) = self.content_search_indexes.get(def_idx as usize) {
472 #[allow(clippy::collapsible_match)]
473 if let Some((schema, _index, reader)) = searchable {
474 if let Some(content_field) = content_def.fields.get(field_idx as usize) {
475 let field = schema.get_field(&field_idx.to_string())?;
476
477 let term = get_term(&content_field.kind, field, value)?;
478 let query = FuzzyTermQuery::new(term, 2, true);
479
480 let searcher = reader.searcher();
481 let docs = searcher
482 .search(&query, &TopDocs::with_limit(255).order_by_score())?;
483
484 let mut objects_builder =
485 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
486 let mut objects_vec = objects_builder.start_vector();
487
488 let txn = ReadTransaction::new(self.env.clone())?;
489 let access = txn.access();
490
491 tracing::info!(ct = docs.len());
492
493 for (_score, doc_addr) in docs {
494 let doc: TantivyDocument = searcher.doc(doc_addr)?;
495 if let Some(indexed_val) = doc.get_first(
496 schema.get_field(&first_indexed_field.idx.to_string())?,
497 ) && let Some(indexed_val) = indexed_val.as_bytes()
498 {
499 let v =
500 access.get::<[u8], [u8]>(&self.content_db, indexed_val)?;
501
502 objects_vec.push(flexbuffers::Blob(v));
503 }
504 }
505
506 objects_vec.end_vector();
507
508 Ok(Bytes::copy_from_slice(objects_builder.view()))
509 } else {
510 bail!("no field idx {field_idx} for content {def_idx}")
511 }
512 } else {
513 bail!("content {def_idx} has no searchable fields")
514 }
515 } else {
516 bail!("content {def_idx} has no searchable fields")
517 }
518 } else {
519 bail!("content {def_idx} has no indexed fields")
520 }
521 } else {
522 bail!("no content definition at idx {def_idx}")
523 }
524 }
525}