Skip to main content

ordinary_storage/stores/
content.rs

1// Copyright (C) 2026 Ordinary Labs, LLC.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4
5use crate::{add_field_to_doc_json, build_schema_for_fields, get_term};
6use anyhow::bail;
7use bytes::{BufMut, Bytes, BytesMut};
8use ordinary_config::{ContentDefinition, ContentLimits};
9use ordinary_types::{ContentObject, Kind, json_to_flexbuffer_vec};
10use rustc_hash::{FxHashMap, FxHashSet};
11use saferlmdb::{
12    self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
13};
14
15use std::path::Path;
16use std::str::FromStr;
17use std::sync::Arc;
18use tantivy::collector::TopDocs;
19use tantivy::directory::MmapDirectory;
20use tantivy::query::FuzzyTermQuery;
21use tantivy::schema::{Schema, Value};
22use tantivy::{Index, IndexWriter, TantivyDocument};
23use tracing::instrument;
24use uuid::Uuid;
25
26pub struct ContentStore {
27    pub limits: ContentLimits,
28    env: Arc<Environment>,
29
30    /// stores the content to be piped into template renderers
31    ///
32    /// key([`definition_idx`][field_idx][`indexed_value`]) -> value([`flexbuffer_bytes`])
33    ///
34    /// in the '`flexbuffer_bytes`' references are stored as a vector of the internal `item_keys`.
35    content_db: Arc<Database<'static>>,
36
37    content_defs: Vec<ContentDefinition>,
38    content_def_map: FxHashMap<String, ContentDefinition>,
39
40    content_search_indexes: Vec<Option<(Schema, Index, tantivy::IndexReader)>>,
41
42    log_size: bool,
43}
44
45impl ContentStore {
46    pub fn new(
47        limits: ContentLimits,
48        mut content_defs: Vec<ContentDefinition>,
49        env: &Arc<Environment>,
50        search_dir: impl AsRef<Path>,
51        log_size: bool,
52    ) -> anyhow::Result<Self> {
53        let content_db = Arc::new(Database::open(
54            env.clone(),
55            Some("content"),
56            &DatabaseOptions::new(lmdb::db::Flags::CREATE),
57        )?);
58
59        content_defs.sort_by_key(|a| a.idx);
60        for def in &mut content_defs {
61            def.fields.sort_by_key(|a| a.idx);
62        }
63
64        let mut content_def_map = FxHashMap::default();
65        let mut content_search_indexes = vec![];
66
67        for def in &content_defs {
68            content_def_map.insert(def.name.clone(), def.clone());
69
70            if limits.search_enabled {
71                let mut schema_builder = Schema::builder();
72
73                let should_build = build_schema_for_fields(&mut schema_builder, &def.fields, false);
74
75                if should_build {
76                    let schema = schema_builder.build();
77
78                    let index_dir = search_dir
79                        .as_ref()
80                        .join("content")
81                        .join(def.idx.to_string());
82                    std::fs::create_dir_all(&index_dir)?;
83
84                    let dir = MmapDirectory::open(index_dir)?;
85                    let index = Index::open_or_create(dir, schema.clone())?;
86
87                    let reader = index
88                        .reader_builder()
89                        .reload_policy(tantivy::ReloadPolicy::OnCommitWithDelay);
90
91                    content_search_indexes.push(Some((schema, index, reader.try_into()?)));
92                } else {
93                    content_search_indexes.push(None);
94                    drop(schema_builder);
95                }
96            }
97        }
98
99        Ok(Self {
100            limits,
101            env: env.clone(),
102            content_db,
103
104            content_defs,
105            content_def_map,
106
107            content_search_indexes,
108
109            log_size,
110        })
111    }
112
113    #[allow(clippy::too_many_lines)]
114    #[instrument(skip_all, err)]
115    pub fn update(&self, objects: &Vec<ContentObject>) -> anyhow::Result<()> {
116        // todo: write objects even if not indexed, queryable or searchable. "all" is still valid
117
118        let mut size = 0;
119        let mut inserts = 0;
120
121        let txn = WriteTransaction::new(self.env.clone())?;
122
123        {
124            let mut access = txn.access();
125            let mut object_cursor = txn.cursor(self.content_db.clone())?;
126
127            let mut del_keys = vec![];
128
129            loop {
130                if let Ok((k, _)) = object_cursor.get_current::<[u8], [u8]>(&access) {
131                    del_keys.push(Bytes::copy_from_slice(k));
132                }
133
134                if object_cursor.next::<[u8], [u8]>(&access).is_err() {
135                    break;
136                }
137            }
138
139            for del_key in del_keys {
140                access.del_key(&self.content_db, &del_key[..])?;
141            }
142
143            let mut seen_instances = FxHashSet::default();
144
145            for object in objects {
146                let mut keys = FxHashSet::default();
147                let mut real_fields_map = FxHashMap::default();
148
149                for field in &object.fields {
150                    real_fields_map.insert(field.name.clone(), &field.value);
151                }
152
153                let mut field_name_map = FxHashMap::default();
154
155                let mut search = None;
156
157                if let Some(content_def) = self.content_def_map.get(&object.instance_of) {
158                    if let Some(searchable) =
159                        self.content_search_indexes.get(content_def.idx as usize)
160                        && let Some((schema, index, _reader)) = searchable
161                    {
162                        // todo: see if this is reusable so it doesn't have to be allocated on every write
163                        let writer: IndexWriter = index.writer(15_000_000)?;
164
165                        if !seen_instances.contains(&object.instance_of) {
166                            writer.delete_all_documents()?;
167                            seen_instances.insert(object.instance_of.clone());
168                        }
169
170                        if self.limits.search_enabled {
171                            search = Some((TantivyDocument::default(), schema, index, writer));
172                        } else {
173                            tracing::warn!(
174                                i = content_def.idx,
175                                nm = content_def.name,
176                                "search disabled"
177                            );
178                        }
179                    }
180
181                    let mut first_indexed = true;
182
183                    for field in &content_def.fields {
184                        field_name_map.insert(
185                            &field.name,
186                            (field.idx, field.kind.clone(), field.searchable),
187                        );
188
189                        if field.indexed == Some(true)
190                            && let Some(real_field_value) = real_fields_map.get(&field.name)
191                        {
192                            match field.kind {
193                                Kind::String | Kind::Url => {
194                                    if let Some(value) = real_field_value.as_str() {
195                                        let mut key = BytesMut::new();
196                                        key.put_u8(content_def.idx);
197                                        key.put_u8(field.idx);
198                                        key.put(value.as_bytes());
199
200                                        if keys.contains(&key) {
201                                            bail!(
202                                                "duplicate index value '{}' for definition '{}' on field '{}'",
203                                                value,
204                                                content_def.name,
205                                                field.name
206                                            );
207                                        }
208
209                                        if first_indexed {
210                                            if let Some((doc, schema, _index, _writer)) =
211                                                &mut search
212                                            {
213                                                let tantivy_field =
214                                                    schema.get_field(&field.idx.to_string())?;
215                                                doc.add_bytes(tantivy_field, &key);
216                                            }
217
218                                            first_indexed = false;
219                                        }
220
221                                        keys.insert(key);
222                                    }
223                                }
224                                Kind::Uuid => {
225                                    if let Some(value) = real_field_value.as_str() {
226                                        let uuid = Uuid::from_str(value)?;
227
228                                        let mut key = BytesMut::new();
229                                        key.put_u8(content_def.idx);
230                                        key.put_u8(field.idx);
231                                        key.put(uuid.as_ref());
232
233                                        if keys.contains(&key) {
234                                            bail!(
235                                                "duplicate index value '{}' for definition '{}' on field '{}'",
236                                                value,
237                                                content_def.name,
238                                                field.name
239                                            );
240                                        }
241
242                                        if first_indexed {
243                                            if let Some((doc, schema, _index, _writer)) =
244                                                &mut search
245                                            {
246                                                let tantivy_field =
247                                                    schema.get_field(&field.idx.to_string())?;
248                                                doc.add_bytes(tantivy_field, &key);
249                                            }
250
251                                            first_indexed = false;
252                                        }
253
254                                        keys.insert(key);
255                                    }
256                                }
257                                _ => {
258                                    bail!("cannot index on type non-uuid/string/url types");
259                                }
260                            }
261                        }
262                    }
263                }
264
265                let mut sorted_fields = object.fields.clone();
266
267                sorted_fields.sort_by(|a, b| {
268                    if let Some((idx1, _, _)) = field_name_map.get(&a.name) {
269                        if let Some((idx2, _, _)) = field_name_map.get(&b.name) {
270                            idx1.cmp(idx2)
271                        } else {
272                            0.cmp(&0)
273                        }
274                    } else {
275                        0.cmp(&0)
276                    }
277                });
278
279                let mut object_builder =
280                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
281                let mut object_vector = object_builder.start_vector();
282
283                let mut field_too_large = false;
284
285                for field in sorted_fields {
286                    if let Some((field_idx, kind, searchable)) = field_name_map.get(&field.name) {
287                        json_to_flexbuffer_vec(kind, &field.value, &mut object_vector)?;
288                        let field_str = field.value.to_string();
289
290                        if field_str.len() as u64 > self.limits.max_field_size {
291                            field_too_large = true;
292
293                            tracing::warn!(def = object.instance_of, object = ?object.fields,  field = field_str, "field too large");
294                            break;
295                        }
296
297                        if searchable == &Some(true)
298                            && let Some((doc, schema, _index, _writer)) = &mut search
299                        {
300                            let tantivy_field = schema.get_field(&field_idx.to_string())?;
301                            add_field_to_doc_json(doc, kind, tantivy_field, &field.value)?;
302                        }
303                    }
304                }
305
306                object_vector.end_vector();
307
308                if !field_too_large {
309                    let mut object_too_large = false;
310                    let object_bytes = object_builder.view();
311
312                    for key in keys {
313                        let object_size = (object_bytes.len() + key.len()) as u64;
314
315                        if object_size > self.limits.max_object_size {
316                            object_too_large = true;
317
318                            tracing::warn!(def = object.instance_of, object = ?object.fields, "object exceeds size limit");
319                            break;
320                        }
321
322                        if size + object_size > self.limits.max_store_size {
323                            object_too_large = true;
324
325                            tracing::warn!(def = object.instance_of, object = ?object.fields, "object store size limit exceeded");
326                            break;
327                        }
328
329                        access.put(
330                            &self.content_db,
331                            key.as_ref(),
332                            object_bytes,
333                            &put::Flags::empty(),
334                        )?;
335
336                        inserts += 1;
337
338                        if self.log_size {
339                            size += object_size;
340                        }
341                    }
342
343                    if !object_too_large && let Some((doc, _schema, _index, mut writer)) = search {
344                        writer.add_document(doc)?;
345                        writer.commit()?;
346                        writer.wait_merging_threads()?;
347                    }
348                }
349            }
350        }
351
352        if self.log_size {
353            tracing::info!(inserts, size = %bytesize::ByteSize(size).display().si_short());
354        } else {
355            tracing::info!(inserts);
356        }
357
358        txn.commit()?;
359
360        Ok(())
361    }
362
363    #[instrument(skip_all, fields(i, nm), err)]
364    pub fn get(&self, def_idx: u8, field_idx: u8, field_value: &[u8]) -> anyhow::Result<Bytes> {
365        if let Some(content_def) = self.content_defs.get(def_idx as usize) {
366            let curr_span = tracing::Span::current();
367            curr_span.record("i", content_def.idx);
368            curr_span.record("nm", tracing::field::display(&content_def.name));
369
370            if let Some(field) = content_def.fields.get(field_idx as usize) {
371                match field.kind {
372                    Kind::String | Kind::Url => tracing::info!(
373                        f = field.idx,
374                        nm = %field.name,
375                        v = %std::str::from_utf8(field_value)?
376                    ),
377                    Kind::Uuid => {
378                        if field_value.len() == 16 {
379                            let uuid: [u8; 16] = field_value[0..16].try_into()?;
380                            tracing::info!(
381                                f = field.idx,
382                                nm = %field.name,
383                                v = Uuid::from_bytes(uuid).to_string()
384                            );
385                        } else {
386                            bail!("uuid is not 16 bytes");
387                        }
388                    }
389                    _ => tracing::error!("kind {:?} not yet supported as indexable", field.kind),
390                }
391
392                let mut key = vec![def_idx, field_idx];
393                key.extend_from_slice(field_value);
394
395                let txn = ReadTransaction::new(self.env.clone())?;
396
397                let access = txn.access();
398                let result = access.get(&self.content_db, &key)?;
399
400                Ok(Bytes::copy_from_slice(result))
401            } else {
402                bail!("no field idx {field_idx} for content definition {def_idx}")
403            }
404        } else {
405            bail!("no content definition for idx {def_idx}")
406        }
407    }
408
409    #[instrument(skip_all, fields(i, nm), err)]
410    pub fn list(&self, def_idx: u8) -> anyhow::Result<Bytes> {
411        if let Some(content_def) = self.content_defs.get(def_idx as usize) {
412            let curr_span = tracing::Span::current();
413            curr_span.record("i", content_def.idx);
414            curr_span.record("nm", tracing::field::display(&content_def.name));
415
416            let mut count = 0;
417
418            let mut objects_builder =
419                flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
420            let mut objects_vec = objects_builder.start_vector();
421
422            let txn = ReadTransaction::new(self.env.clone())?;
423            let access = txn.access();
424
425            let mut object_cursor = txn.cursor(self.content_db.clone())?;
426
427            let (k, v) = object_cursor.seek_range_k::<[u8], [u8]>(&access, &[def_idx])?;
428
429            count += 1;
430            objects_vec.push(flexbuffers::Blob(v));
431
432            let initial_field = k[1];
433
434            while let Ok((k, v)) = object_cursor.next::<[u8], [u8]>(&access) {
435                if k[1] != initial_field {
436                    break;
437                }
438
439                if k[0] == def_idx {
440                    count += 1;
441                    objects_vec.push(flexbuffers::Blob(v));
442                } else {
443                    break;
444                }
445            }
446
447            objects_vec.end_vector();
448            tracing::info!(ct = count);
449
450            return Ok(Bytes::copy_from_slice(objects_builder.view()));
451        }
452
453        bail!("no content definition for idx {def_idx}")
454    }
455
456    #[instrument(skip_all, fields(i, nm), err)]
457    pub fn search_content(
458        &self,
459        def_idx: u8,
460        field_idx: u8,
461        value: &[u8],
462    ) -> anyhow::Result<Bytes> {
463        if let Some(content_def) = self.content_defs.get(def_idx as usize) {
464            let curr_span = tracing::Span::current();
465            curr_span.record("i", content_def.idx);
466            curr_span.record("nm", tracing::field::display(&content_def.name));
467
468            if let Some(first_indexed_field) =
469                content_def.fields.iter().find(|v| v.indexed == Some(true))
470            {
471                if let Some(searchable) = self.content_search_indexes.get(def_idx as usize) {
472                    #[allow(clippy::collapsible_match)]
473                    if let Some((schema, _index, reader)) = searchable {
474                        if let Some(content_field) = content_def.fields.get(field_idx as usize) {
475                            let field = schema.get_field(&field_idx.to_string())?;
476
477                            let term = get_term(&content_field.kind, field, value)?;
478                            let query = FuzzyTermQuery::new(term, 2, true);
479
480                            let searcher = reader.searcher();
481                            let docs = searcher
482                                .search(&query, &TopDocs::with_limit(255).order_by_score())?;
483
484                            let mut objects_builder =
485                                flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
486                            let mut objects_vec = objects_builder.start_vector();
487
488                            let txn = ReadTransaction::new(self.env.clone())?;
489                            let access = txn.access();
490
491                            tracing::info!(ct = docs.len());
492
493                            for (_score, doc_addr) in docs {
494                                let doc: TantivyDocument = searcher.doc(doc_addr)?;
495                                if let Some(indexed_val) = doc.get_first(
496                                    schema.get_field(&first_indexed_field.idx.to_string())?,
497                                ) && let Some(indexed_val) = indexed_val.as_bytes()
498                                {
499                                    let v =
500                                        access.get::<[u8], [u8]>(&self.content_db, indexed_val)?;
501
502                                    objects_vec.push(flexbuffers::Blob(v));
503                                }
504                            }
505
506                            objects_vec.end_vector();
507
508                            Ok(Bytes::copy_from_slice(objects_builder.view()))
509                        } else {
510                            bail!("no field idx {field_idx} for content {def_idx}")
511                        }
512                    } else {
513                        bail!("content {def_idx} has no searchable fields")
514                    }
515                } else {
516                    bail!("content {def_idx} has no searchable fields")
517                }
518            } else {
519                bail!("content {def_idx} has no indexed fields")
520            }
521        } else {
522            bail!("no content definition at idx {def_idx}")
523        }
524    }
525}