tinycortex 0.1.1

Rust core for the TinyCortex memory system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
//! In-memory inverted index for `search_cross_thread_messages`.
//!
//! ## Architecture (v1)
//!
//! ```text
//!                    ┌───────────────┐
//!  query "cat"  ───▶ │  Phase 1:     │  posting-list intersection
//!                    │  ngram lookup │  on character n-grams
//!                    └──────┬────────┘
//!                           │ candidate doc ids per term
//!                    ┌──────▼────────┐
//!                    │  Phase 2:     │  exact substring verify on
//!                    │  verify+score │  normalized content, score
//!                    └──────┬────────┘  by `matched_terms / total_terms`
//!//!//!                       Vec<Hit>
//! ```
//!
//! ## Ownership choices for scale
//!
//! Conversation corpora can grow to hundreds of thousands of messages per
//! workspace. The data structures here are picked to keep the resident-set
//! size predictable at that scale:
//!
//! - **`thread_id` and `role` are interned `Arc<str>`** (see
//!   `intern_thread_id` / `intern_role`). A workspace with one thread of
//!   N messages would otherwise store N copies of the same thread id; a
//!   role string only ever takes two distinct values in practice. The
//!   interner amortises both to a single heap allocation per distinct
//!   value, plus one `Arc` clone per `DocEntry`.
//! - **Posting-map keys are `Box<str>`** (16 bytes) rather than `String`
//!   (24 bytes). Saves 8 bytes per distinct ngram in the corpus — at
//!   ~17k Latin trigrams plus CJK bigrams that adds up.
//! - **Posting lists are still `BTreeSet<u32>`** for ergonomic ordered
//!   iteration. The Phase 1 intersection is performed against a
//!   single-allocation `Vec<u32>` accumulator via a two-pointer
//!   sort-merge (no per-iteration `BTreeSet` rebuilds), so the BTreeSet
//!   shape only affects insertion and removal, not query latency.
//!   Roaring Bitmaps + FST + LSM segments are the long-term destination
//!   (Gemini Deep Research write-up); we defer that until corpus sizes
//!   justify the complexity.
//! - **Whole index lives in RAM**, rebuilt from JSONL on first access in
//!   the process. The JSONL files remain the source of truth.
//! - **Scoring matches the previous linear scan**:
//!   `score = matched_terms / total_terms` with a `created_at` tiebreaker.
//! - **Pathological query short-circuit**: if Phase 1 produces a
//!   candidate set larger than `LARGE_CANDIDATE_LIMIT` for any term, the
//!   index returns recency-ordered hits without running Phase 2. This
//!   genuinely caps tail latency — the check fires *before* the
//!   substring-verification loop, not after.

use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;

use super::tokenize::{ngrams, normalize};
use super::types::{ConversationMessage, CrossThreadHit};

/// Minimum byte length for a query term to be considered. Matches the
/// historical behaviour of `search_cross_thread_messages` so existing
/// callers (and tests) see no change. Single-byte ASCII tokens like "a"
/// or "is" are filtered out; a single CJK character (3 bytes in UTF-8)
/// passes through.
const MIN_TERM_BYTES: usize = 3;

/// When Phase 1 returns more than this many candidates we skip Phase 2
/// verification and fall back to a pure recency-ranked truncation. This
/// is the mitigation for the "user types `e`" pathological case.
const LARGE_CANDIDATE_LIMIT: usize = 10_000;

/// One indexed message. Carries enough state to (a) reconstruct a
/// `CrossThreadHit` without re-reading JSONL on the hot path and (b)
/// verify Phase 1 candidates by exact substring match on the normalized
/// form.
///
/// `thread_id` and `role` are `Arc<str>` because they repeat heavily
/// across messages (N messages per thread → N references to the same
/// thread id; only ~2 distinct role values across the entire corpus).
/// `message_id`, `content`, `content_normalized` and `created_at` are
/// per-message unique so they stay as `String`.
#[derive(Debug, Clone)]
struct DocEntry {
    thread_id: Arc<str>,
    message_id: String,
    role: Arc<str>,
    content: String,            // original, returned verbatim in hits
    content_normalized: String, // for Phase 2 substring verification
    created_at: String,
}

/// In-memory trigram/bigram inverted index over conversation messages.
///
/// Documents are addressed by a dense `u32` doc-id assigned in insertion
/// order. Deletes leave tombstones (`docs[i] = None`) rather than shifting
/// the array, so posting-list integers stay valid without rebuilding.
#[derive(Debug, Default)]
pub(crate) struct InvertedIndex {
    /// `ngram -> sorted set of doc-ids`. BTreeSet so per-doc removals
    /// are O(log n) and iteration is in sorted order (drives the
    /// sort-merge intersect in `candidates_for_term`). Keys are
    /// `Box<str>` to shave 8 bytes per entry vs `String`.
    postings: HashMap<Box<str>, BTreeSet<u32>>,
    /// Tombstoned: `docs[i] == None` means the message was deleted. We
    /// keep the slot so existing doc-ids in posting lists stay valid.
    docs: Vec<Option<DocEntry>>,
    /// Reverse lookup for incremental removal: `(thread_id, message_id)`
    /// → `doc_id`. Letting us drop a single message without re-walking
    /// the corpus.
    by_message: HashMap<(String, String), u32>,
    /// Interner pools. Keep a single `Arc<str>` per distinct thread id
    /// and role so every `DocEntry` referencing them can hold a cheap
    /// 16-byte `Arc` clone instead of a 24-byte `String`.
    thread_id_pool: HashMap<String, Arc<str>>,
    role_pool: HashMap<String, Arc<str>>,
}

impl InvertedIndex {
    pub fn new() -> Self {
        Self::default()
    }

    /// Insert one message. Takes the message by value so the caller's
    /// owned strings can be moved into the index without an internal
    /// clone of each field. If the (thread, message_id) pair is already
    /// in the index this is a no-op (messages are append-only in the
    /// store, so duplicate IDs indicate a corrupt JSONL — silently
    /// ignore rather than panic).
    pub fn insert(&mut self, thread_id: &str, msg: ConversationMessage) {
        let ConversationMessage {
            id,
            content,
            sender,
            created_at,
            message_type: _,
            extra_metadata: _,
        } = msg;

        let key = (thread_id.to_string(), id.clone());
        if self.by_message.contains_key(&key) {
            return;
        }
        let normalized = normalize(&content);
        let doc_id = self.docs.len() as u32;
        for ngram in ngrams(&normalized) {
            if let Some(posting) = self.postings.get_mut(ngram) {
                posting.insert(doc_id);
            } else {
                let mut set = BTreeSet::new();
                set.insert(doc_id);
                self.postings.insert(ngram.into(), set);
            }
        }
        let thread_arc = self.intern_thread_id(thread_id);
        let role_arc = self.intern_role(&sender);
        self.docs.push(Some(DocEntry {
            thread_id: thread_arc,
            message_id: id,
            role: role_arc,
            content,
            content_normalized: normalized,
            created_at,
        }));
        self.by_message.insert(key, doc_id);
    }

    /// Drop every document belonging to a thread. Used by
    /// `delete_thread` and during full purge.
    pub fn remove_thread(&mut self, thread_id: &str) {
        let to_remove: Vec<u32> = self
            .by_message
            .iter()
            .filter(|((t, _), _)| t == thread_id)
            .map(|(_, id)| *id)
            .collect();
        for doc_id in to_remove {
            self.remove_doc(doc_id);
        }
        self.thread_id_pool.remove(thread_id);
    }

    /// Reset the index to its empty state. Cheaper than dropping and
    /// re-allocating when a workspace is being rebuilt. Retained from the
    /// OpenHuman port for callers that rebuild in place.
    #[allow(dead_code)]
    pub fn clear(&mut self) {
        self.postings.clear();
        self.docs.clear();
        self.by_message.clear();
        self.thread_id_pool.clear();
        self.role_pool.clear();
    }

    fn remove_doc(&mut self, doc_id: u32) {
        let idx = doc_id as usize;
        let Some(entry) = self.docs.get_mut(idx).and_then(|slot| slot.take()) else {
            return;
        };
        self.by_message
            .remove(&(entry.thread_id.to_string(), entry.message_id.clone()));
        // Remove doc_id from every posting list referencing it. We re-
        // tokenize the normalized content rather than tracking the
        // per-doc ngram set; tokenization is allocation-free now that
        // `ngrams` returns borrowed slices, so this stays cheap.
        for ngram in ngrams(&entry.content_normalized) {
            if let Some(posting) = self.postings.get_mut(ngram) {
                posting.remove(&doc_id);
                if posting.is_empty() {
                    self.postings.remove(ngram);
                }
            }
        }
    }

    /// The Phase 1 + Phase 2 query pipeline. Mirrors the contract of
    /// `ConversationStore::search_cross_thread_messages` so the store
    /// method can be a thin shim.
    pub fn search(
        &self,
        query: &str,
        limit: usize,
        exclude_thread_id: Option<&str>,
    ) -> Vec<CrossThreadHit> {
        if limit == 0 {
            return Vec::new();
        }
        let query_lower = normalize(query);
        // Filter terms by raw byte length (matches the historical
        // 3-byte threshold; single CJK chars are 3 bytes and pass).
        let terms: Vec<String> = query_lower
            .split_whitespace()
            .filter(|t| t.len() >= MIN_TERM_BYTES)
            .map(|s| s.to_string())
            .collect();
        if terms.is_empty() {
            return Vec::new();
        }

        // Phase 1: collect candidate doc-ids per term. Short-circuit to
        // recency-only ordering if any single term's candidate set
        // already exceeds the pathological threshold — this is the cap
        // on tail latency, and it must fire BEFORE we run the substring
        // verification loop.
        let mut per_term: Vec<Vec<u32>> = Vec::with_capacity(terms.len());
        for term in &terms {
            let candidates = match self.candidates_for_term(term) {
                Some(v) => v,
                None => self
                    .docs
                    .iter()
                    .enumerate()
                    .filter_map(|(i, slot)| slot.as_ref().map(|_| i as u32))
                    .collect::<Vec<u32>>(),
            };
            if candidates.len() > LARGE_CANDIDATE_LIMIT {
                return self.recency_fallback(exclude_thread_id, limit);
            }
            per_term.push(candidates);
        }

        // Phase 2: verify each candidate by exact substring match.
        // Count distinct terms per doc for the score.
        let mut hit_counts: HashMap<u32, usize> = HashMap::new();
        for (term, candidates) in terms.iter().zip(per_term.into_iter()) {
            for doc_id in candidates {
                let Some(entry) = self.docs[doc_id as usize].as_ref() else {
                    continue;
                };
                if exclude_thread_id == Some(entry.thread_id.as_ref()) {
                    continue;
                }
                if entry.content_normalized.contains(term.as_str()) {
                    *hit_counts.entry(doc_id).or_insert(0) += 1;
                }
            }
        }

        let total_terms = terms.len() as f64;
        let mut hits: Vec<CrossThreadHit> = hit_counts
            .into_iter()
            .map(|(doc_id, matched)| {
                let entry = self.docs[doc_id as usize]
                    .as_ref()
                    .expect("doc_id from hit_counts must be live");
                CrossThreadHit {
                    thread_id: entry.thread_id.to_string(),
                    message_id: entry.message_id.clone(),
                    role: entry.role.to_string(),
                    content: entry.content.clone(),
                    created_at: entry.created_at.clone(),
                    score: matched as f64 / total_terms,
                }
            })
            .collect();

        hits.sort_by(|a, b| {
            b.score
                .partial_cmp(&a.score)
                .unwrap_or(std::cmp::Ordering::Equal)
                .then_with(|| b.created_at.cmp(&a.created_at))
        });
        hits.truncate(limit);
        hits
    }

    /// Build the Phase 1 candidate set for one query term.
    ///
    /// Returns `Some(vec)` containing the sorted intersection of posting
    /// lists for every ngram of `term`. If `term` is too short to
    /// produce any ngram (e.g. a single CJK char of length 1) returns
    /// `None` so the caller can fall back to a linear scan.
    ///
    /// The intersect is a two-pointer sort-merge over the already-sorted
    /// posting lists: `acc` is rewritten in place once per remaining
    /// ngram, allocating zero intermediate sets.
    fn candidates_for_term(&self, term: &str) -> Option<Vec<u32>> {
        let term_ngrams = ngrams(term);
        if term_ngrams.is_empty() {
            return None;
        }
        let mut iter = term_ngrams.iter();
        let first = iter.next().expect("non-empty by check above");
        let mut acc: Vec<u32> = match self.postings.get(*first) {
            Some(p) => p.iter().copied().collect(),
            None => return Some(Vec::new()),
        };
        for ng in iter {
            if acc.is_empty() {
                return Some(acc);
            }
            match self.postings.get(*ng) {
                Some(p) => intersect_sorted_with_btreeset(&mut acc, p),
                None => return Some(Vec::new()),
            }
        }
        Some(acc)
    }

    fn intern_thread_id(&mut self, thread_id: &str) -> Arc<str> {
        if let Some(existing) = self.thread_id_pool.get(thread_id) {
            return Arc::clone(existing);
        }
        let arc: Arc<str> = Arc::from(thread_id);
        self.thread_id_pool
            .insert(thread_id.to_string(), Arc::clone(&arc));
        arc
    }

    fn intern_role(&mut self, role: &str) -> Arc<str> {
        if let Some(existing) = self.role_pool.get(role) {
            return Arc::clone(existing);
        }
        let arc: Arc<str> = Arc::from(role);
        self.role_pool.insert(role.to_string(), Arc::clone(&arc));
        arc
    }

    fn recency_fallback(
        &self,
        exclude_thread_id: Option<&str>,
        limit: usize,
    ) -> Vec<CrossThreadHit> {
        let mut hits: Vec<CrossThreadHit> = self
            .docs
            .iter()
            .filter_map(|slot| slot.as_ref())
            .filter(|entry| exclude_thread_id != Some(entry.thread_id.as_ref()))
            .map(|entry| CrossThreadHit {
                thread_id: entry.thread_id.to_string(),
                message_id: entry.message_id.clone(),
                role: entry.role.to_string(),
                content: entry.content.clone(),
                created_at: entry.created_at.clone(),
                // Score 0.0 signals "matched via recency fallback only" —
                // documented in the function rustdoc above. Callers
                // sorting by `(score desc, created_at desc)` still see
                // the newest entries first.
                score: 0.0,
            })
            .collect();
        hits.sort_by(|a, b| b.created_at.cmp(&a.created_at));
        hits.truncate(limit);
        hits
    }
}

/// Two-pointer sort-merge intersect. `acc` and `other` are both sorted
/// ascending; on return `acc` contains only the elements present in
/// both, in the same sorted order. Runs in O(|acc| + |other|) with zero
/// allocations.
fn intersect_sorted_with_btreeset(acc: &mut Vec<u32>, other: &BTreeSet<u32>) {
    let mut other_iter = other.iter().copied().peekable();
    let mut write = 0usize;
    for read in 0..acc.len() {
        let target = acc[read];
        // Advance `other_iter` past everything strictly less than the
        // current `target`. After this loop the next peeked value is
        // either equal to `target` (keep) or strictly greater (drop).
        while let Some(&o) = other_iter.peek() {
            if o < target {
                other_iter.next();
            } else {
                break;
            }
        }
        if other_iter.peek().copied() == Some(target) {
            acc[write] = target;
            write += 1;
            other_iter.next();
        }
    }
    acc.truncate(write);
}

#[cfg(test)]
#[path = "inverted_index_tests.rs"]
mod tests;