use crate::{add_field_to_doc_json, build_schema_for_fields, get_term};
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use ordinary_config::{ContentDefinition, ContentLimits};
use ordinary_types::{ContentObject, Kind, json_to_flexbuffer_vec};
use rustc_hash::{FxHashMap, FxHashSet};
use saferlmdb::{
self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use tantivy::collector::TopDocs;
use tantivy::directory::MmapDirectory;
use tantivy::query::FuzzyTermQuery;
use tantivy::schema::{Schema, Value};
use tantivy::{Index, IndexWriter, TantivyDocument};
use tracing::instrument;
use uuid::Uuid;
pub struct ContentStore {
pub limits: ContentLimits,
env: Arc<Environment>,
content_db: Arc<Database<'static>>,
content_defs: Vec<ContentDefinition>,
content_def_map: FxHashMap<String, ContentDefinition>,
content_search_indexes: Vec<Option<(Schema, Index, tantivy::IndexReader)>>,
log_size: bool,
}
impl ContentStore {
pub fn new(
limits: ContentLimits,
mut content_defs: Vec<ContentDefinition>,
env: &Arc<Environment>,
search_dir: impl AsRef<Path>,
log_size: bool,
) -> anyhow::Result<Self> {
let content_db = Arc::new(Database::open(
env.clone(),
Some("content"),
&DatabaseOptions::new(lmdb::db::Flags::CREATE),
)?);
content_defs.sort_by_key(|a| a.idx);
for def in &mut content_defs {
def.fields.sort_by_key(|a| a.idx);
}
let mut content_def_map = FxHashMap::default();
let mut content_search_indexes = vec![];
for def in &content_defs {
content_def_map.insert(def.name.clone(), def.clone());
if limits.search_enabled {
let mut schema_builder = Schema::builder();
let should_build = build_schema_for_fields(&mut schema_builder, &def.fields, false);
if should_build {
let schema = schema_builder.build();
let index_dir = search_dir
.as_ref()
.join("content")
.join(def.idx.to_string());
std::fs::create_dir_all(&index_dir)?;
let dir = MmapDirectory::open(index_dir)?;
let index = Index::open_or_create(dir, schema.clone())?;
let reader = index
.reader_builder()
.reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
content_search_indexes.push(Some((schema, index, reader.try_into()?)));
} else {
content_search_indexes.push(None);
drop(schema_builder);
}
}
}
Ok(Self {
limits,
env: env.clone(),
content_db,
content_defs,
content_def_map,
content_search_indexes,
log_size,
})
}
#[allow(clippy::too_many_lines)]
#[instrument(skip_all, err)]
pub fn update(&self, objects: &Vec<ContentObject>) -> anyhow::Result<()> {
let mut size = 0;
let mut inserts = 0;
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
let mut object_cursor = txn.cursor(self.content_db.clone())?;
let mut del_keys = vec![];
loop {
if let Ok((k, _)) = object_cursor.get_current::<[u8], [u8]>(&access) {
del_keys.push(Bytes::copy_from_slice(k));
}
if object_cursor.next::<[u8], [u8]>(&access).is_err() {
break;
}
}
for del_key in del_keys {
access.del_key(&self.content_db, &del_key[..])?;
}
let mut seen_instances = FxHashSet::default();
for object in objects {
let mut keys = FxHashSet::default();
let mut real_fields_map = FxHashMap::default();
for field in &object.fields {
real_fields_map.insert(field.name.clone(), &field.value);
}
let mut field_name_map = FxHashMap::default();
let mut search = None;
if let Some(content_def) = self.content_def_map.get(&object.instance_of) {
if let Some(searchable) =
self.content_search_indexes.get(content_def.idx as usize)
&& let Some((schema, index, _reader)) = searchable
{
let writer: IndexWriter = index.writer(15_000_000)?;
if !seen_instances.contains(&object.instance_of) {
writer.delete_all_documents()?;
seen_instances.insert(object.instance_of.clone());
}
if self.limits.search_enabled {
search = Some((TantivyDocument::default(), schema, index, writer));
} else {
tracing::warn!(
i = content_def.idx,
nm = content_def.name,
"search disabled"
);
}
}
let mut first_indexed = true;
for field in &content_def.fields {
field_name_map.insert(
&field.name,
(field.idx, field.kind.clone(), field.searchable),
);
if field.indexed == Some(true)
&& let Some(real_field_value) = real_fields_map.get(&field.name)
{
match field.kind {
Kind::String | Kind::Url => {
if let Some(value) = real_field_value.as_str() {
let mut key = BytesMut::new();
key.put_u8(content_def.idx);
key.put_u8(field.idx);
key.put(value.as_bytes());
if keys.contains(&key) {
bail!(
"duplicate index value '{}' for definition '{}' on field '{}'",
value,
content_def.name,
field.name
);
}
if first_indexed {
if let Some((doc, schema, _index, _writer)) =
&mut search
{
let tantivy_field =
schema.get_field(&field.idx.to_string())?;
doc.add_bytes(tantivy_field, &key);
}
first_indexed = false;
}
keys.insert(key);
}
}
Kind::Uuid => {
if let Some(value) = real_field_value.as_str() {
let uuid = Uuid::from_str(value)?;
let mut key = BytesMut::new();
key.put_u8(content_def.idx);
key.put_u8(field.idx);
key.put(uuid.as_ref());
if keys.contains(&key) {
bail!(
"duplicate index value '{}' for definition '{}' on field '{}'",
value,
content_def.name,
field.name
);
}
if first_indexed {
if let Some((doc, schema, _index, _writer)) =
&mut search
{
let tantivy_field =
schema.get_field(&field.idx.to_string())?;
doc.add_bytes(tantivy_field, &key);
}
first_indexed = false;
}
keys.insert(key);
}
}
_ => {
bail!("cannot index on type non-uuid/string/url types");
}
}
}
}
}
let mut sorted_fields = object.fields.clone();
sorted_fields.sort_by(|a, b| {
if let Some((idx1, _, _)) = field_name_map.get(&a.name) {
if let Some((idx2, _, _)) = field_name_map.get(&b.name) {
idx1.cmp(idx2)
} else {
0.cmp(&0)
}
} else {
0.cmp(&0)
}
});
let mut object_builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut object_vector = object_builder.start_vector();
let mut field_too_large = false;
for field in sorted_fields {
if let Some((field_idx, kind, searchable)) = field_name_map.get(&field.name) {
json_to_flexbuffer_vec(kind, &field.value, &mut object_vector)?;
let field_str = field.value.to_string();
if field_str.len() as u64 > self.limits.max_field_size {
field_too_large = true;
tracing::warn!(def = object.instance_of, object = ?object.fields, field = field_str, "field too large");
break;
}
if searchable == &Some(true)
&& let Some((doc, schema, _index, _writer)) = &mut search
{
let tantivy_field = schema.get_field(&field_idx.to_string())?;
add_field_to_doc_json(doc, kind, tantivy_field, &field.value)?;
}
}
}
object_vector.end_vector();
if !field_too_large {
let mut object_too_large = false;
let object_bytes = object_builder.view();
for key in keys {
let object_size = (object_bytes.len() + key.len()) as u64;
if object_size > self.limits.max_object_size {
object_too_large = true;
tracing::warn!(def = object.instance_of, object = ?object.fields, "object exceeds size limit");
break;
}
if size + object_size > self.limits.max_store_size {
object_too_large = true;
tracing::warn!(def = object.instance_of, object = ?object.fields, "object store size limit exceeded");
break;
}
access.put(
&self.content_db,
key.as_ref(),
object_bytes,
&put::Flags::empty(),
)?;
inserts += 1;
if self.log_size {
size += object_size;
}
}
if !object_too_large && let Some((doc, _schema, _index, mut writer)) = search {
writer.add_document(doc)?;
writer.commit()?;
writer.wait_merging_threads()?;
}
}
}
}
if self.log_size {
tracing::info!(inserts, size = %bytesize::ByteSize(size).display().si_short());
} else {
tracing::info!(inserts);
}
txn.commit()?;
Ok(())
}
#[instrument(skip_all, fields(i, nm), err)]
pub fn get(&self, def_idx: u8, field_idx: u8, field_value: &[u8]) -> anyhow::Result<Bytes> {
if let Some(content_def) = self.content_defs.get(def_idx as usize) {
let curr_span = tracing::Span::current();
curr_span.record("i", content_def.idx);
curr_span.record("nm", tracing::field::display(&content_def.name));
if let Some(field) = content_def.fields.get(field_idx as usize) {
match field.kind {
Kind::String | Kind::Url => tracing::info!(
f = field.idx,
nm = %field.name,
v = %std::str::from_utf8(field_value)?
),
Kind::Uuid => {
if field_value.len() == 16 {
let uuid: [u8; 16] = field_value[0..16].try_into()?;
tracing::info!(
f = field.idx,
nm = %field.name,
v = Uuid::from_bytes(uuid).to_string()
);
} else {
bail!("uuid is not 16 bytes");
}
}
_ => tracing::error!("kind {:?} not yet supported as indexable", field.kind),
}
let mut key = vec![def_idx, field_idx];
key.extend_from_slice(field_value);
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let result = access.get(&self.content_db, &key)?;
Ok(Bytes::copy_from_slice(result))
} else {
bail!("no field idx {field_idx} for content definition {def_idx}")
}
} else {
bail!("no content definition for idx {def_idx}")
}
}
#[instrument(skip_all, fields(i, nm), err)]
pub fn list(&self, def_idx: u8) -> anyhow::Result<Bytes> {
if let Some(content_def) = self.content_defs.get(def_idx as usize) {
let curr_span = tracing::Span::current();
curr_span.record("i", content_def.idx);
curr_span.record("nm", tracing::field::display(&content_def.name));
let mut count = 0;
let mut objects_builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut objects_vec = objects_builder.start_vector();
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let mut object_cursor = txn.cursor(self.content_db.clone())?;
let (k, v) = object_cursor.seek_range_k::<[u8], [u8]>(&access, &[def_idx])?;
count += 1;
objects_vec.push(flexbuffers::Blob(v));
let initial_field = k[1];
while let Ok((k, v)) = object_cursor.next::<[u8], [u8]>(&access) {
if k[1] != initial_field {
break;
}
if k[0] == def_idx {
count += 1;
objects_vec.push(flexbuffers::Blob(v));
} else {
break;
}
}
objects_vec.end_vector();
tracing::info!(ct = count);
return Ok(Bytes::copy_from_slice(objects_builder.view()));
}
bail!("no content definition for idx {def_idx}")
}
#[instrument(skip_all, fields(i, nm), err)]
pub fn search_content(
&self,
def_idx: u8,
field_idx: u8,
value: &[u8],
) -> anyhow::Result<Bytes> {
if let Some(content_def) = self.content_defs.get(def_idx as usize) {
let curr_span = tracing::Span::current();
curr_span.record("i", content_def.idx);
curr_span.record("nm", tracing::field::display(&content_def.name));
if let Some(first_indexed_field) =
content_def.fields.iter().find(|v| v.indexed == Some(true))
{
if let Some(searchable) = self.content_search_indexes.get(def_idx as usize) {
#[allow(clippy::collapsible_match)]
if let Some((schema, _index, reader)) = searchable {
if let Some(content_field) = content_def.fields.get(field_idx as usize) {
let field = schema.get_field(&field_idx.to_string())?;
let term = get_term(&content_field.kind, field, value)?;
let query = FuzzyTermQuery::new(term, 2, true);
let searcher = reader.searcher();
let docs = searcher
.search(&query, &TopDocs::with_limit(255).order_by_score())?;
let mut objects_builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut objects_vec = objects_builder.start_vector();
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
tracing::info!(ct = docs.len());
for (_score, doc_addr) in docs {
let doc: TantivyDocument = searcher.doc(doc_addr)?;
if let Some(indexed_val) = doc.get_first(
schema.get_field(&first_indexed_field.idx.to_string())?,
) && let Some(indexed_val) = indexed_val.as_bytes()
{
let v =
access.get::<[u8], [u8]>(&self.content_db, indexed_val)?;
objects_vec.push(flexbuffers::Blob(v));
}
}
objects_vec.end_vector();
Ok(Bytes::copy_from_slice(objects_builder.view()))
} else {
bail!("no field idx {field_idx} for content {def_idx}")
}
} else {
bail!("content {def_idx} has no searchable fields")
}
} else {
bail!("content {def_idx} has no searchable fields")
}
} else {
bail!("content {def_idx} has no indexed fields")
}
} else {
bail!("no content definition at idx {def_idx}")
}
}
}