lantern 0.3.0

Local-first, provenance-aware semantic search for agent activity
Documentation
//! Machine-readable dump of indexed content.
//!
//! `export` renders the full contents of the local store (or a filtered
//! subset) as a single JSON document. Every exported chunk carries its
//! provenance fields (byte range, char count, sha256, source id) so the dump
//! is a self-contained snapshot an agent can reason over offline.

use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use serde::Serialize;
use std::path::Path;

use crate::entities::{ChunkEntity, load_entities_for_chunk};
use crate::search::{self, ConfidenceBreakdown};
use crate::store::Store;

#[derive(Debug, Clone, Serialize)]
pub struct Export {
    pub schema_version: i64,
    pub exported_at: i64,
    pub filter: FilterSnapshot,
    pub sources: Vec<ExportedSource>,
}

#[derive(Debug, Clone, Serialize)]
pub struct FilterSnapshot {
    pub path: Option<String>,
    pub query: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
pub struct ExportedSource {
    pub source_id: String,
    pub uri: String,
    pub path: Option<String>,
    pub kind: String,
    pub bytes: i64,
    pub content_sha256: String,
    pub mtime_unix: Option<i64>,
    pub ingested_at: i64,
    pub chunks: Vec<ExportedChunk>,
}

#[derive(Debug, Clone, Serialize)]
pub struct ExportedChunk {
    pub chunk_id: String,
    pub ordinal: i64,
    pub byte_start: i64,
    pub byte_end: i64,
    pub char_count: i64,
    pub sha256: String,
    pub text: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub role: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub session_id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub turn_id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub parent_turn_id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_name: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_call_id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub timestamp_unix: Option<i64>,
    pub access_count: i64,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub last_accessed_at: Option<i64>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub access_decay_at: Option<i64>,
    pub feedback_score: i64,
    pub query_success_count: i64,
    pub confidence: f64,
    pub confidence_breakdown: ConfidenceBreakdown,
    /// Entities extracted from this chunk's text, populated only when the
    /// caller opted in via `LoadSourceOptions::with_entities`. `None` keeps the
    /// default JSON shape identical for callers that did not ask for entity
    /// evidence; an empty `Some(vec![])` means "asked, none found".
    #[serde(skip_serializing_if = "Option::is_none")]
    pub entities: Option<Vec<ChunkEntity>>,
}

/// Optional decorations layered on top of [`load_source`]. The default keeps
/// the existing `Export` / `ExportedSource` wire shape unchanged; opting into
/// `with_entities` populates `ExportedChunk::entities` for callers like
/// `show --show-entities` that want chunk-level knowledge-graph evidence.
#[derive(Debug, Clone, Copy, Default)]
pub struct LoadSourceOptions {
    /// When `Some(n)` with `n > 0`, populate each chunk's `entities` field
    /// with up to `n` extracted entities (deterministic order). `Some(0)` is
    /// treated the same as `None` so flags wired through `--show-entities 0`
    /// or omitted CLI options stay on the cheap path.
    pub with_entities: Option<usize>,
}

#[derive(Debug, Clone, Default)]
pub struct ExportFilter {
    /// Match sources whose `uri` or `path` contains this substring.
    pub path_contains: Option<String>,
    /// Restrict to sources with at least one chunk matching this FTS query.
    pub query: Option<String>,
}

pub fn export(store: &Store, filter: &ExportFilter) -> Result<Export> {
    let conn = store.conn();
    let exported_at = now_unix();
    let ids = select_source_ids(conn, filter)?;
    let mut sources = Vec::with_capacity(ids.len());
    for id in &ids {
        sources.push(load_source(conn, id, exported_at)?);
    }
    Ok(Export {
        schema_version: store.schema_version()?,
        exported_at,
        filter: FilterSnapshot {
            path: filter.path_contains.clone(),
            query: filter.query.clone(),
        },
        sources,
    })
}

pub fn write_json(export: &Export, output: Option<&Path>) -> Result<()> {
    let json = serde_json::to_string_pretty(export)?;
    match output {
        Some(path) => std::fs::write(path, format!("{json}\n"))
            .with_context(|| format!("writing export to {}", path.display()))?,
        None => println!("{json}"),
    }
    Ok(())
}

fn select_source_ids(conn: &Connection, filter: &ExportFilter) -> Result<Vec<String>> {
    let fts_query = filter.query.as_deref().map(search::build_fts_query);
    if matches!(fts_query.as_deref(), Some("")) {
        // A query that normalises to empty cannot match anything; preserve
        // the "empty filter means no results" contract.
        return Ok(Vec::new());
    }

    let path_like = filter.path_contains.as_deref().map(|p| format!("%{p}%"));

    let ids = match (path_like.as_deref(), fts_query.as_deref()) {
        (None, None) => collect_ids(
            conn,
            "SELECT id FROM sources ORDER BY ingested_at DESC, id DESC",
            params![],
        )?,
        (Some(like), None) => collect_ids(
            conn,
            "SELECT id FROM sources
             WHERE (path LIKE ?1 OR uri LIKE ?1)
             ORDER BY ingested_at DESC, id DESC",
            params![like],
        )?,
        (None, Some(fts)) => collect_ids(
            conn,
            "SELECT s.id FROM sources s
             WHERE EXISTS (
                SELECT 1 FROM chunks c
                JOIN chunks_fts ON chunks_fts.rowid = c.rowid
                WHERE c.source_id = s.id AND chunks_fts MATCH ?1
             )
             ORDER BY s.ingested_at DESC, s.id DESC",
            params![fts],
        )?,
        (Some(like), Some(fts)) => collect_ids(
            conn,
            "SELECT s.id FROM sources s
             WHERE (s.path LIKE ?1 OR s.uri LIKE ?1)
               AND EXISTS (
                SELECT 1 FROM chunks c
                JOIN chunks_fts ON chunks_fts.rowid = c.rowid
                WHERE c.source_id = s.id AND chunks_fts MATCH ?2
             )
             ORDER BY s.ingested_at DESC, s.id DESC",
            params![like, fts],
        )?,
    };

    Ok(ids)
}

fn collect_ids(
    conn: &Connection,
    sql: &str,
    params: &[&dyn rusqlite::ToSql],
) -> Result<Vec<String>> {
    let mut stmt = conn.prepare(sql)?;
    let rows = stmt.query_map(params, |row| row.get::<_, String>(0))?;
    Ok(rows.collect::<Result<Vec<_>, _>>()?)
}

pub(crate) fn load_source(conn: &Connection, id: &str, as_of_unix: i64) -> Result<ExportedSource> {
    load_source_with_options(conn, id, as_of_unix, LoadSourceOptions::default())
}

pub(crate) fn load_source_with_options(
    conn: &Connection,
    id: &str,
    as_of_unix: i64,
    opts: LoadSourceOptions,
) -> Result<ExportedSource> {
    let mut source = conn.query_row(
        "SELECT id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at
         FROM sources WHERE id = ?1",
        params![id],
        |row| {
            Ok(ExportedSource {
                source_id: row.get(0)?,
                uri: row.get(1)?,
                path: row.get(2)?,
                kind: row.get(3)?,
                bytes: row.get(4)?,
                content_sha256: row.get(5)?,
                mtime_unix: row.get(6)?,
                ingested_at: row.get(7)?,
                chunks: Vec::new(),
            })
        },
    )?;

    let mut stmt = conn.prepare(
        "SELECT id, ordinal, byte_start, byte_end, char_count, sha256, text,
                role, session_id, turn_id, parent_turn_id, tool_name, tool_call_id, timestamp_unix,
                access_count, last_accessed_at, access_decay_at, feedback_score,
                query_success_count
         FROM chunks WHERE source_id = ?1 ORDER BY ordinal",
    )?;
    let chunks = stmt.query_map(params![id], |row| {
        let timestamp_unix: Option<i64> = row.get(13)?;
        let access_count: i64 = row.get(14)?;
        let last_accessed_at: Option<i64> = row.get(15)?;
        let access_decay_at: Option<i64> = row.get(16)?;
        let feedback_score: i64 = row.get(17)?;
        let query_success_count: i64 = row.get(18)?;
        let (confidence, confidence_breakdown) = search::compute_confidence_breakdown(
            as_of_unix,
            last_accessed_at,
            timestamp_unix,
            access_count,
            feedback_score,
            query_success_count,
        );

        Ok(ExportedChunk {
            chunk_id: row.get(0)?,
            ordinal: row.get(1)?,
            byte_start: row.get(2)?,
            byte_end: row.get(3)?,
            char_count: row.get(4)?,
            sha256: row.get(5)?,
            text: row.get(6)?,
            role: row.get(7)?,
            session_id: row.get(8)?,
            turn_id: row.get(9)?,
            parent_turn_id: row.get(10)?,
            tool_name: row.get(11)?,
            tool_call_id: row.get(12)?,
            timestamp_unix,
            access_count,
            last_accessed_at,
            access_decay_at,
            feedback_score,
            query_success_count,
            confidence,
            confidence_breakdown,
            entities: None,
        })
    })?;
    source.chunks = chunks.collect::<Result<Vec<_>, _>>()?;

    // Populate chunk-level entities only when the caller opted in. Loading
    // happens after the chunks query closes its borrow on `conn`, so the
    // per-chunk lookup can use a fresh prepared statement without violating
    // rusqlite's borrow rules.
    if let Some(limit) = opts.with_entities.filter(|n| *n > 0) {
        for chunk in &mut source.chunks {
            let entities = load_entities_for_chunk(conn, &chunk.chunk_id, limit)?;
            chunk.entities = Some(entities);
        }
    }

    Ok(source)
}

fn now_unix() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs() as i64)
        .unwrap_or(0)
}

#[cfg(test)]
mod tests {
    use super::load_source;
    use crate::search::compute_confidence_breakdown;
    use crate::store::Store;
    use tempfile::tempdir;

    #[test]
    fn load_source_exports_query_success_count() {
        let dir = tempdir().unwrap();
        let store = Store::open(dir.path()).unwrap();
        let conn = store.conn();
        conn.execute(
            "INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
            rusqlite::params![
                "source-1",
                "file:///tmp/source.txt",
                Some("/tmp/source.txt"),
                "text/plain",
                11i64,
                "0123456789abcdef",
                Some(1_700_000_000i64),
                1_700_000_100i64,
            ],
        )
        .unwrap();
        conn.execute(
            "INSERT INTO chunks (
                id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256,
                created_at, access_count, last_accessed_at, access_decay_at,
                feedback_score, query_success_count
             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
            rusqlite::params![
                "chunk-1",
                "source-1",
                0i64,
                0i64,
                11i64,
                11i64,
                "hello world",
                "abcdef0123456789",
                1_700_000_050i64,
                3i64,
                Some(1_700_000_075i64),
                None::<i64>,
                -1i64,
                7i64,
            ],
        )
        .unwrap();

        let source = load_source(conn, "source-1", 1_700_000_200).unwrap();
        assert_eq!(source.chunks.len(), 1);
        assert_eq!(source.chunks[0].query_success_count, 7);
        assert_eq!(source.chunks[0].feedback_score, -1);
        assert!(source.chunks[0].confidence > 0.0 && source.chunks[0].confidence <= 1.0);
        let (expected_confidence, expected_breakdown) = compute_confidence_breakdown(
            1_700_000_200,
            Some(1_700_000_075),
            Some(1_700_000_050),
            3,
            -1,
            7,
        );
        assert!((source.chunks[0].confidence - expected_confidence).abs() < 1e-12);
        assert_eq!(
            source.chunks[0].confidence_breakdown.freshness,
            expected_breakdown.freshness
        );
        assert_eq!(
            source.chunks[0].confidence_breakdown.freshness_source,
            expected_breakdown.freshness_source
        );
        assert_eq!(
            source.chunks[0].confidence_breakdown.access_boost,
            expected_breakdown.access_boost
        );
        assert_eq!(
            source.chunks[0].confidence_breakdown.base,
            expected_breakdown.base
        );
        assert_eq!(
            source.chunks[0].confidence_breakdown.feedback_factor,
            expected_breakdown.feedback_factor
        );
        assert_eq!(
            source.chunks[0].confidence_breakdown.query_success_factor,
            expected_breakdown.query_success_factor
        );
    }
}