sqlite-graphrag 1.0.67

Local GraphRAG memory for LLMs in a single SQLite file
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! fastembed wrapper and per-process embedding cache.
//!
//! Owns the in-process `TextEmbedding` model and exposes batch encode/query
//! helpers used by remember, recall, and related commands.
// Workload: CPU-bound (ONNX inference, matrix multiplication via fastembed)

use crate::constants::{
    EMBEDDING_DIM, EMBEDDING_MAX_TOKENS, FASTEMBED_BATCH_SIZE, PASSAGE_PREFIX, QUERY_PREFIX,
    REMEMBER_MAX_CONTROLLED_BATCH_CHUNKS, REMEMBER_MAX_CONTROLLED_BATCH_PADDED_TOKENS,
};
use crate::errors::AppError;
use fastembed::{EmbeddingModel, ExecutionProviderDispatch, TextEmbedding, TextInitOptions};
use ort::ep::CPU;
use parking_lot::Mutex;
use std::path::Path;
use std::sync::OnceLock;

/// Process-wide singleton embedding model behind a `Mutex`.
///
/// ONNX Runtime's `Session` is not guaranteed thread-safe for concurrent
/// inference; `Mutex` serialises all embedding calls.  This is correct by
/// design — without the daemon, embedding throughput is intentionally serial.
///
/// For parallel workloads (enrich, ingest) start the daemon first:
/// `sqlite-graphrag daemon` — the model is loaded once and served via UDS,
/// eliminating Mutex contention across CLI invocations.
static EMBEDDER: OnceLock<Mutex<TextEmbedding>> = OnceLock::new();

/// Returns the process-wide singleton embedder, initializing it on first call.
/// Subsequent calls return the cached instance regardless of `models_dir`.
///
/// # Errors
///
/// - [`AppError::Embedding`] — ONNX model load failure or runtime initialisation error.
/// - [`AppError::Io`] — cache directory is inaccessible or cannot be created.
pub fn get_embedder(models_dir: &Path) -> Result<&'static Mutex<TextEmbedding>, AppError> {
    if let Some(m) = EMBEDDER.get() {
        return Ok(m);
    }

    maybe_init_dynamic_ort(models_dir)?;

    // Multi-layer mitigation of the explosive RSS observed with variable-shape
    // payloads. The three current layers are:
    //   1. `with_arena_allocator(false)` on the CPU execution provider (line below)
    //   2. env var `ORT_DISABLE_CPU_MEM_ARENA=1` in `main.rs` (default since v1.0.18)
    //   3. env var `ORT_NUM_THREADS=1` + `ORT_INTRA_OP_NUM_THREADS=1` in `main.rs`
    // The `with_memory_pattern(false)` flag exists in ort 2.0 (`SessionBuilder`)
    // but fastembed 5.13.2 does NOT expose access to a custom SessionBuilder via
    // `TextInitOptions`. If RSS grows again in real corpora, the next
    // mitigation requires one of the following paths:
    //   - Fork fastembed to expose `SessionBuilder::with_memory_pattern(false)`
    //   - Bypass fastembed and use ort directly with a custom SessionBuilder
    //   - Fixed padding in `plan_controlled_batches` to eliminate variable shapes
    // References:
    //   https://onnxruntime.ai/docs/performance/tune-performance/memory.html
    //   https://github.com/qdrant/fastembed/issues/570
    let cpu_ep: ExecutionProviderDispatch = CPU::default().with_arena_allocator(false).build();

    let model = TextEmbedding::try_new(
        TextInitOptions::new(EmbeddingModel::MultilingualE5Small)
            .with_execution_providers(vec![cpu_ep])
            .with_max_length(EMBEDDING_MAX_TOKENS)
            .with_show_download_progress(true)
            .with_cache_dir(models_dir.to_path_buf()),
    )
    .map_err(|e| AppError::Embedding(e.to_string()))?;
    // If another thread raced and won, discard our instance and return theirs.
    let _ = EMBEDDER.set(Mutex::new(model));
    EMBEDDER.get().ok_or_else(|| {
        AppError::Embedding(
            "embedder OnceLock unexpectedly empty after set() (likely a racing initializer aborted before completion)"
                .into(),
        )
    })
}

#[cfg(all(target_arch = "aarch64", target_os = "linux", target_env = "gnu"))]
fn maybe_init_dynamic_ort(models_dir: &Path) -> Result<(), AppError> {
    let mut candidates = Vec::with_capacity(4);

    if let Ok(path) = std::env::var("ORT_DYLIB_PATH") {
        if !path.is_empty() {
            candidates.push(std::path::PathBuf::from(path));
        }
    }

    if let Ok(exe) = std::env::current_exe() {
        if let Some(dir) = exe.parent() {
            candidates.push(dir.join("libonnxruntime.so"));
            candidates.push(dir.join("lib").join("libonnxruntime.so"));
        }
    }

    candidates.push(models_dir.join("libonnxruntime.so"));

    for path in candidates {
        if !path.exists() {
            continue;
        }

        std::env::set_var("ORT_DYLIB_PATH", &path);
        let _ = ort::init_from(&path)
            .map_err(|e| AppError::Embedding(e.to_string()))?
            .commit();
        return Ok(());
    }

    Ok(())
}

#[cfg(not(all(target_arch = "aarch64", target_os = "linux", target_env = "gnu")))]
fn maybe_init_dynamic_ort(_models_dir: &Path) -> Result<(), AppError> {
    Ok(())
}

/// Embeds a single passage using the `passage:` prefix required by E5 models.
///
/// # Errors
/// Returns `Err` when the model returns an unexpected result.
#[tracing::instrument(skip(embedder, text), fields(text_len = text.len()))]
pub fn embed_passage(embedder: &Mutex<TextEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
    let prefixed = format!("{PASSAGE_PREFIX}{text}");
    let results = embedder
        .lock()
        .embed(vec![prefixed.as_str()], Some(1))
        .map_err(|e| AppError::Embedding(e.to_string()))?;
    let emb = results
        .into_iter()
        .next()
        .ok_or_else(|| AppError::Embedding("empty embedding result".into()))?;
    assert_eq!(emb.len(), EMBEDDING_DIM, "unexpected embedding dimension");
    Ok(emb)
}

/// Embeds a search query using the `query:` prefix required by E5 models.
///
/// # Errors
/// Returns `Err` when the model returns an unexpected result.
#[tracing::instrument(skip(embedder, text), fields(text_len = text.len()))]
pub fn embed_query(embedder: &Mutex<TextEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
    let prefixed = format!("{QUERY_PREFIX}{text}");
    let results = embedder
        .lock()
        .embed(vec![prefixed.as_str()], Some(1))
        .map_err(|e| AppError::Embedding(e.to_string()))?;
    let emb = results
        .into_iter()
        .next()
        .ok_or_else(|| AppError::Embedding("empty embedding result".into()))?;
    Ok(emb)
}

/// Embeds multiple passages in a single ONNX batch call.
///
/// `batch_size` is capped at `FASTEMBED_BATCH_SIZE`. All texts receive the `passage:` prefix.
///
/// # Errors
/// Returns `Err` when the model inference fails.
#[tracing::instrument(skip(embedder, texts), fields(batch_size = texts.len()))]
pub fn embed_passages_batch(
    embedder: &Mutex<TextEmbedding>,
    texts: &[&str],
    batch_size: usize,
) -> Result<Vec<Vec<f32>>, AppError> {
    let prefixed: Vec<String> = texts
        .iter()
        .map(|t| format!("{PASSAGE_PREFIX}{t}"))
        .collect();
    let strs: Vec<&str> = prefixed.iter().map(String::as_str).collect();
    let results = embedder
        .lock()
        .embed(strs, Some(batch_size.min(FASTEMBED_BATCH_SIZE)))
        .map_err(|e| AppError::Embedding(e.to_string()))?;
    for emb in &results {
        assert_eq!(emb.len(), EMBEDDING_DIM, "unexpected embedding dimension");
    }
    Ok(results)
}

/// Returns the number of batches that [`embed_passages_controlled`] would produce
/// for the given `token_counts` slice without running inference.
pub fn controlled_batch_count(token_counts: &[usize]) -> usize {
    plan_controlled_batches(token_counts).len()
}

/// Embeds passages grouped into token-budget-aware batches to avoid OOM on variable-length inputs.
///
/// `texts` and `token_counts` must have the same length. Batches are planned using an
/// internal budget algorithm and single-item batches fall back to [`embed_passage`].
///
/// # Errors
/// Returns `Err` when lengths differ, the mutex is poisoned, or inference fails.
pub fn embed_passages_controlled(
    embedder: &Mutex<TextEmbedding>,
    texts: &[&str],
    token_counts: &[usize],
) -> Result<Vec<Vec<f32>>, AppError> {
    if texts.len() != token_counts.len() {
        return Err(AppError::Internal(anyhow::anyhow!(
            "texts/token_counts length mismatch in controlled embedding"
        )));
    }

    let mut results = Vec::with_capacity(texts.len());
    for (start, end) in plan_controlled_batches(token_counts) {
        if end - start == 1 {
            results.push(embed_passage(embedder, texts[start])?);
            continue;
        }

        results.extend(embed_passages_batch(
            embedder,
            &texts[start..end],
            end - start,
        )?);
    }

    Ok(results)
}

/// Embed multiple passages one-by-one (serial ONNX inference).
///
/// Serialization is **intentional**: ONNX batch inference can trigger pathological
/// runtime behaviour on real-world Markdown chunks (variable token lengths cause
/// extreme padding overhead). Callers that need parallelism should use the rayon
/// `ThreadPool` in `src/commands/ingest.rs::run`, which partitions work across
/// CPU threads and calls this function per shard.
///
/// # Errors
///
/// Returns [`AppError::Embedding`] when the ONNX encoder fails on any passage.
pub fn embed_passages_serial<'a, I>(
    embedder: &Mutex<TextEmbedding>,
    texts: I,
) -> Result<Vec<Vec<f32>>, AppError>
where
    I: IntoIterator<Item = &'a str>,
{
    let iter = texts.into_iter();
    let (lower, _) = iter.size_hint();
    let mut results = Vec::with_capacity(lower);
    for text in iter {
        results.push(embed_passage(embedder, text)?);
    }
    Ok(results)
}

fn plan_controlled_batches(token_counts: &[usize]) -> Vec<(usize, usize)> {
    let mut batches =
        Vec::with_capacity((token_counts.len() / REMEMBER_MAX_CONTROLLED_BATCH_CHUNKS).max(1));
    let mut start = 0usize;

    while start < token_counts.len() {
        let mut end = start + 1;
        let mut max_tokens = token_counts[start].max(1);

        while end < token_counts.len() && end - start < REMEMBER_MAX_CONTROLLED_BATCH_CHUNKS {
            let candidate_max = max_tokens.max(token_counts[end].max(1));
            let candidate_len = end + 1 - start;
            if candidate_max * candidate_len > REMEMBER_MAX_CONTROLLED_BATCH_PADDED_TOKENS {
                break;
            }
            max_tokens = candidate_max;
            end += 1;
        }

        batches.push((start, end));
        start = end;
    }

    batches
}

/// Convert `&[f32]` to `&[u8]` for sqlite-vec storage.
///
/// # Safety
///
/// This function is sound when the following invariants hold:
/// 1. `f32` has no padding bytes per the Rust reference
///    (<https://doc.rust-lang.org/reference/types/numeric.html>);
///    `[f32]` has the same byte representation as `[u8; size_of_val(v)]`.
/// 2. The returned `&[u8]` borrows from `v`; its lifetime is tied to the input slice.
/// 3. Endianness matches sqlite-vec on supported platforms (x86_64, aarch64 little-endian).
///    Targets with big-endian `f32` storage are not supported by sqlite-vec.
#[cfg(target_endian = "big")]
compile_error!(
    "sqlite-graphrag requires little-endian f32 layout for sqlite-vec compatibility. \
     Big-endian targets (PPC64, S390x) are not supported."
);

pub fn f32_to_bytes(v: &[f32]) -> &[u8] {
    // SAFETY: see invariants above. f32→u8 transmute via from_raw_parts is sound.
    unsafe { std::slice::from_raw_parts(v.as_ptr() as *const u8, std::mem::size_of_val(v)) }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::constants::{EMBEDDING_DIM, PASSAGE_PREFIX, QUERY_PREFIX};

    // --- f32_to_bytes tests (pure function, no model) ---

    #[test]
    fn f32_to_bytes_empty_slice_returns_empty() {
        let v: Vec<f32> = vec![];
        assert_eq!(f32_to_bytes(&v), &[] as &[u8]);
    }

    #[test]
    fn f32_to_bytes_one_element_returns_4_bytes() {
        let v = vec![1.0_f32];
        let bytes = f32_to_bytes(&v);
        assert_eq!(bytes.len(), 4);
        // roundtrip: the 4 bytes must reconstruct the original f32
        let recovered = f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
        assert_eq!(recovered, 1.0_f32);
    }

    #[test]
    fn f32_to_bytes_length_is_4x_elements() {
        let v = vec![0.0_f32, 1.0, 2.0, 3.0];
        assert_eq!(f32_to_bytes(&v).len(), v.len() * 4);
    }

    #[test]
    fn f32_to_bytes_zero_encoded_as_4_zeros() {
        let v = vec![0.0_f32];
        assert_eq!(f32_to_bytes(&v), &[0u8, 0, 0, 0]);
    }

    #[test]
    fn f32_to_bytes_roundtrip_vector_embedding_dim() {
        let v: Vec<f32> = (0..EMBEDDING_DIM).map(|i| i as f32 * 0.001).collect();
        let bytes = f32_to_bytes(&v);
        assert_eq!(bytes.len(), EMBEDDING_DIM * 4);
        // reconstructs and compares first and last element
        let first = f32::from_le_bytes(bytes[0..4].try_into().unwrap());
        assert!((first - 0.0_f32).abs() < 1e-6);
        let last_start = (EMBEDDING_DIM - 1) * 4;
        let last = f32::from_le_bytes(bytes[last_start..last_start + 4].try_into().unwrap());
        assert!((last - (EMBEDDING_DIM - 1) as f32 * 0.001).abs() < 1e-4);
    }

    // --- verifies prefixes used by the embedder (no model) ---

    #[test]
    fn passage_prefix_not_empty() {
        assert_eq!(PASSAGE_PREFIX, "passage: ");
    }

    #[test]
    fn query_prefix_not_empty() {
        assert_eq!(QUERY_PREFIX, "query: ");
    }

    #[test]
    fn embedding_dim_is_384() {
        assert_eq!(EMBEDDING_DIM, 384);
    }

    // --- testes com modelo real (ignorados no CI normal) ---

    #[test]
    #[ignore = "requires ~600 MB model on disk; run with --include-ignored"]
    fn embed_passage_returns_vector_with_correct_dimension() {
        let dir = tempfile::tempdir().unwrap();
        let embedder = get_embedder(dir.path()).unwrap();
        let result = embed_passage(embedder, "test text").unwrap();
        assert_eq!(result.len(), EMBEDDING_DIM);
    }

    #[test]
    #[ignore = "requires ~600 MB model on disk; run with --include-ignored"]
    fn embed_query_returns_vector_with_correct_dimension() {
        let dir = tempfile::tempdir().unwrap();
        let embedder = get_embedder(dir.path()).unwrap();
        let result = embed_query(embedder, "test query").unwrap();
        assert_eq!(result.len(), EMBEDDING_DIM);
    }

    #[test]
    #[ignore = "requires ~600 MB model on disk; run with --include-ignored"]
    fn embed_passages_batch_returns_one_vector_per_text() {
        let dir = tempfile::tempdir().unwrap();
        let embedder = get_embedder(dir.path()).unwrap();
        let textos = ["primeiro", "segundo"];
        let results = embed_passages_batch(embedder, &textos, 2).unwrap();
        assert_eq!(results.len(), 2);
        for emb in &results {
            assert_eq!(emb.len(), EMBEDDING_DIM);
        }
    }

    #[test]
    fn controlled_batch_plan_respects_budget() {
        assert_eq!(
            plan_controlled_batches(&[100, 100, 100, 100, 300, 300]),
            vec![(0, 4), (4, 5), (5, 6)]
        );
    }

    #[test]
    fn controlled_batch_count_returns_one_for_single_chunk() {
        assert_eq!(controlled_batch_count(&[350]), 1);
    }
}