use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::collections::HashSet;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum EntityKind {
Url,
Repo,
Domain,
Email,
FilePath,
Mention,
Hashtag,
}
impl EntityKind {
pub fn as_str(self) -> &'static str {
match self {
EntityKind::Url => "url",
EntityKind::Repo => "repo",
EntityKind::Domain => "domain",
EntityKind::Email => "email",
EntityKind::FilePath => "filepath",
EntityKind::Mention => "mention",
EntityKind::Hashtag => "hashtag",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum EdgeKind {
CoOccursWith,
SameSessionAs,
}
impl EdgeKind {
pub fn as_str(self) -> &'static str {
match self {
EdgeKind::CoOccursWith => "co_occurs_with",
EdgeKind::SameSessionAs => "same_session_as",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Entity {
pub kind: EntityKind,
pub value: String,
}
pub fn entity_id(kind: EntityKind, value: &str) -> String {
let mut h = Sha256::new();
h.update(kind.as_str().as_bytes());
h.update(b":");
h.update(value.as_bytes());
hex::encode(&h.finalize()[..16])
}
const URL_TRAILING_TRIM: &[char] = &[
',', '.', ';', ':', '!', '?', '\'', '"', ')', ']', '}', '>', '`',
];
const EMAIL_TRAILING_TRIM: &[char] = &[
',', '.', ';', ':', '!', '?', '\'', '"', ')', ']', '}', '>', '`',
];
fn push_entity(out: &mut Vec<Entity>, seen: &mut HashSet<String>, kind: EntityKind, value: String) {
let key = format!("{}:{}", kind.as_str(), value);
if seen.insert(key) {
out.push(Entity { kind, value });
}
}
fn looks_like_file_path_literal(candidate: &str) -> bool {
if candidate.is_empty() || candidate.chars().any(|c| c.is_whitespace()) {
return false;
}
if candidate.contains("://") {
return false;
}
if candidate.contains('/') {
return true;
}
match candidate.rsplit_once('.') {
Some((stem, ext)) => {
!stem.is_empty()
&& !ext.is_empty()
&& stem.chars().any(|c| c.is_ascii_alphanumeric())
&& ext.chars().any(|c| c.is_ascii_alphabetic())
&& stem
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-'))
&& ext.chars().all(|c| c.is_ascii_alphanumeric())
}
None => false,
}
}
fn push_file_path_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let mut cursor = 0usize;
while let Some(rel_start) = text[cursor..].find('`') {
let start = cursor + rel_start + 1;
let rest = &text[start..];
let Some(rel_end) = rest.find('`') else {
break;
};
let candidate = rest[..rel_end].trim();
if looks_like_file_path_literal(candidate) {
push_entity(out, seen, EntityKind::FilePath, candidate.to_string());
}
cursor = start + rel_end + 1;
}
}
const PATH_TOKEN_LEADING_TRIM: &[char] = &['(', '[', '{', '<', '\'', '"', '`'];
const PATH_TOKEN_TRAILING_TRIM: &[char] = &[
',', '.', ';', ':', '!', '?', '\'', '"', ')', ']', '}', '>', '`',
];
fn looks_like_plain_file_path_token(candidate: &str) -> bool {
if candidate.is_empty() || candidate.chars().any(|c| c.is_whitespace()) {
return false;
}
if candidate.contains("://") {
return false;
}
if !(candidate.starts_with("./") || candidate.starts_with("../") || candidate.starts_with('/'))
{
return false;
}
looks_like_file_path_literal(candidate)
}
fn push_plain_file_path_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
for token in text.split_whitespace() {
let candidate = token
.trim_start_matches(PATH_TOKEN_LEADING_TRIM)
.trim_end_matches(PATH_TOKEN_TRAILING_TRIM);
if looks_like_plain_file_path_token(candidate) {
push_entity(out, seen, EntityKind::FilePath, candidate.to_string());
}
}
}
fn is_email_local_char(byte: u8) -> bool {
byte.is_ascii_alphanumeric() || matches!(byte, b'.' | b'_' | b'%' | b'+' | b'-')
}
fn is_email_domain_char(byte: u8) -> bool {
byte.is_ascii_alphanumeric() || matches!(byte, b'.' | b'-')
}
fn is_valid_email(candidate: &str) -> bool {
let (local, domain) = match candidate.split_once('@') {
Some(parts) => parts,
None => return false,
};
if local.is_empty() || domain.is_empty() {
return false;
}
if local.starts_with('.') || local.ends_with('.') {
return false;
}
if domain.starts_with('.') || domain.ends_with('.') {
return false;
}
if !local.as_bytes().iter().copied().all(is_email_local_char) {
return false;
}
if !domain.as_bytes().iter().copied().all(is_email_domain_char) {
return false;
}
let labels: Vec<&str> = domain.split('.').collect();
if labels.len() < 2 || labels.iter().any(|label| label.is_empty()) {
return false;
}
if labels
.iter()
.take(labels.len() - 1)
.any(|label| label.starts_with('-') || label.ends_with('-'))
{
return false;
}
let tld = labels.last().unwrap();
tld.len() >= 2 && tld.chars().all(|c| c.is_ascii_alphabetic())
}
fn push_url_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
for prefix in ["https://", "http://"] {
let mut cursor = 0usize;
while let Some(rel) = text[cursor..].find(prefix) {
let start = cursor + rel;
let rest = &text[start..];
let end = rest.find(|c: char| c.is_whitespace()).unwrap_or(rest.len());
let raw = &rest[..end];
let trimmed = raw.trim_end_matches(URL_TRAILING_TRIM);
if trimmed.len() > prefix.len() {
push_entity(out, seen, EntityKind::Url, trimmed.to_string());
push_repo_entity_from_url(trimmed, out, seen);
push_domain_entity_from_url(trimmed, out, seen);
}
cursor = start + prefix.len();
}
}
}
fn push_domain_entity_from_url(url: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let stripped = url
.strip_prefix("https://")
.or_else(|| url.strip_prefix("http://"))
.unwrap_or(url);
let authority = stripped
.split(|c: char| matches!(c, '/' | '?' | '#'))
.next()
.unwrap_or(stripped);
if authority.is_empty() {
return;
}
let host_with_port = authority
.rsplit_once('@')
.map(|(_, h)| h)
.unwrap_or(authority);
let host = host_with_port
.rsplit_once(':')
.map(|(h, _)| h)
.unwrap_or(host_with_port);
if let Some(domain) = normalize_domain(host) {
push_entity(out, seen, EntityKind::Domain, domain);
}
}
fn normalize_domain(host: &str) -> Option<String> {
if host.is_empty() {
return None;
}
if !host
.as_bytes()
.iter()
.copied()
.all(|b| b.is_ascii_alphanumeric() || matches!(b, b'.' | b'-'))
{
return None;
}
if host.starts_with('.') || host.ends_with('.') {
return None;
}
let labels: Vec<&str> = host.split('.').collect();
if labels.len() < 2 || labels.iter().any(|l| l.is_empty()) {
return None;
}
if labels
.iter()
.any(|l| l.starts_with('-') || l.ends_with('-'))
{
return None;
}
Some(host.to_ascii_lowercase())
}
fn looks_like_repo_slug_segment(segment: &str) -> bool {
if segment.is_empty() {
return false;
}
let mut has_letter = false;
for ch in segment.chars() {
if ch.is_ascii_alphabetic() {
has_letter = true;
} else if !(ch.is_ascii_digit() || matches!(ch, '.' | '_' | '-')) {
return false;
}
}
has_letter
}
fn push_repo_entity_from_url(url: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let stripped = url
.strip_prefix("https://")
.or_else(|| url.strip_prefix("http://"))
.unwrap_or(url);
let Some((_, path)) = stripped.split_once('/') else {
return;
};
let path = path.split(['?', '#']).next().unwrap_or(path);
let mut segments = path.split('/').filter(|segment| !segment.is_empty());
let owner = match segments.next() {
Some(segment) => segment,
None => return,
};
let repo = match segments.next() {
Some(segment) => segment.strip_suffix(".git").unwrap_or(segment),
None => return,
};
if segments.next().is_some() {
return;
}
if looks_like_repo_slug_segment(owner) && looks_like_repo_slug_segment(repo) {
push_entity(out, seen, EntityKind::Repo, format!("{owner}/{repo}"));
}
}
fn is_mention_body_char(byte: u8) -> bool {
byte.is_ascii_alphanumeric() || matches!(byte, b'_' | b'-' | b'.')
}
const MENTION_TRAILING_TRIM: &[char] = &['.', '-', '_'];
fn push_mention_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let bytes = text.as_bytes();
for (idx, byte) in bytes.iter().enumerate() {
if *byte != b'@' {
continue;
}
if idx > 0 && is_email_local_char(bytes[idx - 1]) {
continue;
}
let body_start = idx + 1;
let mut end = body_start;
while end < bytes.len() && is_mention_body_char(bytes[end]) {
end += 1;
}
let raw = &text[body_start..end];
let trimmed = raw.trim_end_matches(MENTION_TRAILING_TRIM);
if trimmed.len() < 2 {
continue;
}
let first = trimmed.as_bytes()[0];
if !first.is_ascii_alphanumeric() && first != b'_' {
continue;
}
if !trimmed.bytes().any(|b| b.is_ascii_alphabetic()) {
continue;
}
push_entity(out, seen, EntityKind::Mention, format!("@{trimmed}"));
}
}
fn is_hashtag_body_char(byte: u8) -> bool {
byte.is_ascii_alphanumeric() || matches!(byte, b'_' | b'-' | b'.')
}
fn push_hashtag_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let bytes = text.as_bytes();
for (idx, byte) in bytes.iter().enumerate() {
if *byte != b'#' {
continue;
}
if idx > 0 && (bytes[idx - 1].is_ascii_alphanumeric() || bytes[idx - 1] == b'_') {
continue;
}
let token_start = bytes[..idx]
.iter()
.rposition(|b| b.is_ascii_whitespace())
.map(|i| i + 1)
.unwrap_or(0);
if text[token_start..idx].contains("://") {
continue;
}
let body_start = idx + 1;
let mut end = body_start;
while end < bytes.len() && is_hashtag_body_char(bytes[end]) {
end += 1;
}
let raw = &text[body_start..end];
let trimmed = raw.trim_end_matches(MENTION_TRAILING_TRIM);
if trimmed.len() < 2 {
continue;
}
let first = trimmed.as_bytes()[0];
if !first.is_ascii_alphanumeric() && first != b'_' {
continue;
}
if !trimmed.bytes().any(|b| b.is_ascii_alphabetic()) {
continue;
}
push_entity(out, seen, EntityKind::Hashtag, format!("#{trimmed}"));
}
}
fn push_email_entities(text: &str, out: &mut Vec<Entity>, seen: &mut HashSet<String>) {
let bytes = text.as_bytes();
for (idx, byte) in bytes.iter().enumerate() {
if *byte != b'@' {
continue;
}
let mut start = idx;
while start > 0 && is_email_local_char(bytes[start - 1]) {
start -= 1;
}
let mut end = idx + 1;
while end < bytes.len() && is_email_domain_char(bytes[end]) {
end += 1;
}
if start < idx && end > idx + 1 {
let candidate = text[start..end].trim_end_matches(EMAIL_TRAILING_TRIM);
if is_valid_email(candidate) {
push_entity(out, seen, EntityKind::Email, candidate.to_string());
if let Some((_, domain)) = candidate.split_once('@') {
if let Some(normalized) = normalize_domain(domain) {
push_entity(out, seen, EntityKind::Domain, normalized);
}
}
}
}
}
}
pub fn extract_entities(text: &str) -> Vec<Entity> {
let mut out: Vec<Entity> = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
push_url_entities(text, &mut out, &mut seen);
push_email_entities(text, &mut out, &mut seen);
push_mention_entities(text, &mut out, &mut seen);
push_hashtag_entities(text, &mut out, &mut seen);
push_file_path_entities(text, &mut out, &mut seen);
push_plain_file_path_entities(text, &mut out, &mut seen);
out
}
pub fn entity_kind_from_str(raw: &str) -> Result<EntityKind> {
match raw.to_ascii_lowercase().as_str() {
"url" => Ok(EntityKind::Url),
"repo" => Ok(EntityKind::Repo),
"domain" => Ok(EntityKind::Domain),
"email" => Ok(EntityKind::Email),
"filepath" => Ok(EntityKind::FilePath),
"mention" => Ok(EntityKind::Mention),
"hashtag" => Ok(EntityKind::Hashtag),
other => {
anyhow::bail!(
"unknown entity kind '{other}' (expected: url, repo, domain, email, filepath, mention, hashtag)"
)
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EntityListOptions {
pub kind: Option<EntityKind>,
pub value_contains: Option<String>,
pub limit: Option<usize>,
pub with_chunks: Option<usize>,
pub session_id: Option<String>,
pub with_sessions: Option<usize>,
pub with_projects: Option<usize>,
pub with_users: Option<usize>,
pub with_topics: Option<usize>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct EntityListEntry {
pub id: String,
pub kind: EntityKind,
pub value: String,
pub chunk_count: i64,
pub session_count: i64,
pub project_count: i64,
pub user_count: i64,
pub topic_count: i64,
pub created_at: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunks: Option<Vec<EntityChunkRef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_ids: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub projects: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub users: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub topics: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct EntityChunkRef {
pub chunk_id: String,
pub source_id: String,
pub source_uri: String,
pub snippet: String,
}
const CHUNK_SNIPPET_CHARS: usize = 160;
#[derive(Debug, Clone, Serialize)]
pub struct EntityListReport {
pub entries: Vec<EntityListEntry>,
pub total_matched: i64,
pub kind_filter: Option<EntityKind>,
pub value_contains: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
}
pub fn list_entities(conn: &Connection, opts: &EntityListOptions) -> Result<EntityListReport> {
let mut where_clauses: Vec<&str> = Vec::new();
let kind_str = opts.kind.map(|k| k.as_str().to_string());
let like_pattern = opts.value_contains.as_deref().map(|s| {
format!(
"%{}%",
s.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
)
});
let session_id = opts.session_id.as_deref();
if kind_str.is_some() {
where_clauses.push("e.kind = ?");
}
if like_pattern.is_some() {
where_clauses.push("e.value LIKE ? ESCAPE '\\'");
}
if session_id.is_some() {
where_clauses.push("c.session_id = ?");
}
let where_sql = if where_clauses.is_empty() {
String::new()
} else {
format!("WHERE {}", where_clauses.join(" AND "))
};
let (total_sql, list_sql) = if session_id.is_some() {
(
format!(
"SELECT COUNT(*) FROM (
SELECT e.id
FROM entities e
JOIN chunk_entities ce ON ce.entity_id = e.id
JOIN chunks c ON c.id = ce.chunk_id
{where_sql}
GROUP BY e.id
)"
),
format!(
"SELECT e.id, e.kind, e.value, e.created_at,
COUNT(DISTINCT ce.chunk_id) AS chunk_count,
COUNT(DISTINCT c.session_id) AS session_count,
COUNT(DISTINCT c.project) AS project_count,
COUNT(DISTINCT c.user) AS user_count,
COUNT(DISTINCT c.topic) AS topic_count
FROM entities e
JOIN chunk_entities ce ON ce.entity_id = e.id
JOIN chunks c ON c.id = ce.chunk_id
{where_sql}
GROUP BY e.id, e.kind, e.value, e.created_at
ORDER BY chunk_count DESC, e.value ASC, e.id ASC
LIMIT ?"
),
)
} else {
(
format!("SELECT COUNT(*) FROM entities e {where_sql}"),
format!(
"SELECT e.id, e.kind, e.value, e.created_at,
(SELECT COUNT(*) FROM chunk_entities ce WHERE ce.entity_id = e.id) AS chunk_count,
(SELECT COUNT(DISTINCT c.session_id)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = e.id
AND c.session_id IS NOT NULL) AS session_count,
(SELECT COUNT(DISTINCT c.project)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = e.id
AND c.project IS NOT NULL) AS project_count,
(SELECT COUNT(DISTINCT c.user)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = e.id
AND c.user IS NOT NULL) AS user_count,
(SELECT COUNT(DISTINCT c.topic)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = e.id
AND c.topic IS NOT NULL) AS topic_count
FROM entities e
{where_sql}
ORDER BY chunk_count DESC, e.value ASC, e.id ASC
LIMIT ?"
),
)
};
let mut total_params: Vec<rusqlite::types::Value> = Vec::new();
if let Some(k) = &kind_str {
total_params.push(k.clone().into());
}
if let Some(p) = &like_pattern {
total_params.push(p.clone().into());
}
if let Some(s) = session_id {
total_params.push(s.to_string().into());
}
let total_param_refs: Vec<&dyn rusqlite::ToSql> = total_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let total_matched: i64 = conn
.query_row(&total_sql, total_param_refs.as_slice(), |row| row.get(0))
.with_context(|| format!("counting entities matching {opts:?}"))?;
let limit = opts.limit.unwrap_or(usize::MAX).min(i64::MAX as usize) as i64;
let mut list_params = total_params.clone();
list_params.push(limit.into());
let list_param_refs: Vec<&dyn rusqlite::ToSql> = list_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let mut stmt = conn
.prepare(&list_sql)
.context("preparing list_entities query")?;
let rows = stmt
.query_map(list_param_refs.as_slice(), |row| {
let kind_raw: String = row.get(1)?;
let kind = entity_kind_from_str(&kind_raw).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(std::io::Error::other(e.to_string())),
)
})?;
Ok(EntityListEntry {
id: row.get(0)?,
kind,
value: row.get(2)?,
created_at: row.get(3)?,
chunk_count: row.get(4)?,
session_count: row.get(5)?,
project_count: row.get(6)?,
user_count: row.get(7)?,
topic_count: row.get(8)?,
chunks: None,
session_ids: None,
projects: None,
users: None,
topics: None,
})
})
.context("running list_entities query")?;
let mut entries = rows.collect::<Result<Vec<_>, _>>()?;
if let Some(per_entity_limit) = opts.with_chunks {
for entry in &mut entries {
entry.chunks = Some(load_chunk_refs(
conn,
&entry.id,
per_entity_limit,
session_id,
)?);
}
}
if let Some(per_entity_limit) = opts.with_sessions.filter(|n| *n > 0) {
for entry in &mut entries {
entry.session_ids = Some(load_entity_session_ids(
conn,
&entry.id,
per_entity_limit,
session_id,
)?);
}
}
if let Some(per_entity_limit) = opts.with_projects.filter(|n| *n > 0) {
for entry in &mut entries {
entry.projects = Some(load_entity_projects(
conn,
&entry.id,
per_entity_limit,
session_id,
)?);
}
}
if let Some(per_entity_limit) = opts.with_users.filter(|n| *n > 0) {
for entry in &mut entries {
entry.users = Some(load_entity_users(
conn,
&entry.id,
per_entity_limit,
session_id,
)?);
}
}
if let Some(per_entity_limit) = opts.with_topics.filter(|n| *n > 0) {
for entry in &mut entries {
entry.topics = Some(load_entity_topics(
conn,
&entry.id,
per_entity_limit,
session_id,
)?);
}
}
Ok(EntityListReport {
entries,
total_matched,
kind_filter: opts.kind,
value_contains: opts.value_contains.clone(),
session_id: opts.session_id.clone(),
})
}
fn load_chunk_refs(
conn: &Connection,
entity_id: &str,
limit: usize,
session_id: Option<&str>,
) -> Result<Vec<EntityChunkRef>> {
if limit == 0 {
return Ok(Vec::new());
}
if let Some(session) = session_id {
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
JOIN sources s ON s.id = c.source_id
WHERE ce.entity_id = ?1
AND c.session_id = ?2
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?3",
)
.context("preparing session-scoped chunks-for-entity query")?;
let rows = stmt
.query_map(params![entity_id, session, limit as i64], |row| {
build_chunk_ref(row)
})
.context("running session-scoped chunks-for-entity query")?;
return Ok(rows.collect::<Result<Vec<_>, _>>()?);
}
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
JOIN sources s ON s.id = c.source_id
WHERE ce.entity_id = ?1
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?2",
)
.context("preparing chunks-for-entity query")?;
let rows = stmt
.query_map(params![entity_id, limit as i64], |row| build_chunk_ref(row))
.context("running chunks-for-entity query")?;
let refs = rows.collect::<Result<Vec<_>, _>>()?;
Ok(refs)
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct ChunkEntity {
pub id: String,
pub kind: EntityKind,
pub value: String,
}
pub fn load_entities_for_chunk(
conn: &Connection,
chunk_id: &str,
limit: usize,
) -> Result<Vec<ChunkEntity>> {
if limit == 0 {
return Ok(Vec::new());
}
let mut stmt = conn
.prepare(
"SELECT e.id, e.kind, e.value
FROM chunk_entities ce
JOIN entities e ON e.id = ce.entity_id
WHERE ce.chunk_id = ?1
ORDER BY e.kind ASC, e.value ASC, e.id ASC
LIMIT ?2",
)
.context("preparing entities-for-chunk query")?;
let rows = stmt
.query_map(params![chunk_id, limit as i64], |row| {
let id: String = row.get(0)?;
let kind_raw: String = row.get(1)?;
let value: String = row.get(2)?;
let kind = entity_kind_from_str(&kind_raw).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(std::io::Error::other(e.to_string())),
)
})?;
Ok(ChunkEntity { id, kind, value })
})
.context("running entities-for-chunk query")?;
Ok(rows.collect::<Result<Vec<_>, _>>()?)
}
fn load_shared_chunk_refs(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_id: Option<&str>,
) -> Result<Vec<EntityChunkRef>> {
if limit == 0 {
return Ok(Vec::new());
}
if let Some(session) = session_id {
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
JOIN sources s ON s.id = c.source_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.session_id = ?3
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-chunks-for-neighbors query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit as i64],
|row| build_chunk_ref(row),
)
.context("running session-scoped shared-chunks-for-neighbors query")?;
return Ok(rows.collect::<Result<Vec<_>, _>>()?);
}
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
JOIN sources s ON s.id = c.source_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?3",
)
.context("preparing shared-chunks-for-neighbors query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit as i64],
|row| build_chunk_ref(row),
)
.context("running shared-chunks-for-neighbors query")?;
let refs = rows.collect::<Result<Vec<_>, _>>()?;
Ok(refs)
}
fn load_shared_chunk_session_ids(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.session_id
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.session_id = ?3
ORDER BY c.session_id ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-chunk-session-ids query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-chunk-session-ids query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.session_id
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.session_id IS NOT NULL
ORDER BY c.session_id ASC
LIMIT ?3",
)
.context("preparing shared-chunk-session-ids query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-chunk-session-ids query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_chunk_projects(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.session_id = ?3
AND c.project IS NOT NULL
ORDER BY c.project ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-chunk-projects query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-chunk-projects query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.project IS NOT NULL
ORDER BY c.project ASC
LIMIT ?3",
)
.context("preparing shared-chunk-projects query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-chunk-projects query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_chunk_users(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.session_id = ?3
AND c.user IS NOT NULL
ORDER BY c.user ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-chunk-users query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-chunk-users query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunk_entities ce_source
JOIN chunk_entities ce_neighbor ON ce_neighbor.chunk_id = ce_source.chunk_id
JOIN chunks c ON c.id = ce_source.chunk_id
WHERE ce_source.entity_id = ?1
AND ce_neighbor.entity_id = ?2
AND c.user IS NOT NULL
ORDER BY c.user ASC
LIMIT ?3",
)
.context("preparing shared-chunk-users query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-chunk-users query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn build_chunk_ref(row: &rusqlite::Row<'_>) -> rusqlite::Result<EntityChunkRef> {
let text: String = row.get(3)?;
Ok(EntityChunkRef {
chunk_id: row.get(0)?,
source_id: row.get(1)?,
source_uri: row.get(2)?,
snippet: snippet_for(&text),
})
}
fn snippet_for(text: &str) -> String {
let collapsed: String = text.split_whitespace().collect::<Vec<_>>().join(" ");
let mut out = String::new();
for ch in collapsed.chars().take(CHUNK_SNIPPET_CHARS) {
out.push(ch);
}
if collapsed.chars().count() > CHUNK_SNIPPET_CHARS {
out.push('…');
}
out
}
#[derive(Debug, Clone, Default)]
pub struct EntityNeighborsOptions {
pub kind: Option<EntityKind>,
pub limit: Option<usize>,
pub with_chunks: Option<usize>,
pub session_id: Option<String>,
pub with_sessions: Option<usize>,
pub with_projects: Option<usize>,
pub with_users: Option<usize>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct EntityNeighbor {
pub id: String,
pub kind: EntityKind,
pub value: String,
pub edge_kind: EdgeKind,
pub shared_chunks: i64,
pub chunk_count: i64,
pub session_count: i64,
pub project_count: i64,
pub user_count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunks: Option<Vec<EntityChunkRef>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_session_ids: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_projects: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_users: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct EntityNeighborsReport {
pub source_id: String,
pub source_kind: EntityKind,
pub source_value: String,
pub neighbors: Vec<EntityNeighbor>,
pub total_neighbors: i64,
pub kind_filter: Option<EntityKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
}
pub fn entity_neighbors(
conn: &Connection,
source_id: &str,
opts: &EntityNeighborsOptions,
) -> Result<EntityNeighborsReport> {
let (source_kind_raw, source_value) = conn
.query_row(
"SELECT kind, value FROM entities WHERE id = ?1",
params![source_id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
)
.map_err(|err| match err {
rusqlite::Error::QueryReturnedNoRows => {
anyhow::anyhow!("no entity with id '{source_id}'")
}
other => anyhow::Error::from(other).context(format!("looking up entity {source_id}")),
})?;
let source_kind = entity_kind_from_str(&source_kind_raw)?;
let kind_str = opts.kind.map(|k| k.as_str().to_string());
let session_str = opts.session_id.as_deref();
let session_pos: Option<usize> = session_str.map(|_| 2);
let kind_pos: Option<usize> = if kind_str.is_some() {
Some(if session_pos.is_some() { 3 } else { 2 })
} else {
None
};
let limit_pos: usize = match (session_pos.is_some(), kind_pos.is_some()) {
(true, true) => 4,
(true, false) => 3,
(false, true) => 3,
(false, false) => 2,
};
let session_join = if session_pos.is_some() {
" JOIN chunks c ON c.id = ce_other.chunk_id "
} else {
""
};
let session_where = match session_pos {
Some(p) => format!(" AND c.session_id = ?{p}"),
None => String::new(),
};
let kind_clause = match kind_pos {
Some(p) => format!(" AND e.kind = ?{p}"),
None => String::new(),
};
let chunk_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(*) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p})"
),
None => "(SELECT COUNT(*) FROM chunk_entities ce2 WHERE ce2.entity_id = e.id)".to_string(),
};
let session_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.session_id) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p})"
),
None => "(SELECT COUNT(DISTINCT c2.session_id) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id IS NOT NULL)"
.to_string(),
};
let project_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.project) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p}
AND c2.project IS NOT NULL)"
),
None => "(SELECT COUNT(DISTINCT c2.project) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.project IS NOT NULL)"
.to_string(),
};
let user_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.user) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p}
AND c2.user IS NOT NULL)"
),
None => "(SELECT COUNT(DISTINCT c2.user) FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.user IS NOT NULL)"
.to_string(),
};
let total_sql = format!(
"SELECT COUNT(*) FROM (
SELECT e.id
FROM entities e
JOIN chunk_entities ce_other ON ce_other.entity_id = e.id
JOIN chunk_entities ce_self ON ce_self.chunk_id = ce_other.chunk_id
{session_join}
WHERE ce_self.entity_id = ?1
AND e.id != ?1{session_where}{kind_clause}
GROUP BY e.id
)"
);
let list_sql = format!(
"SELECT e.id, e.kind, e.value,
COUNT(DISTINCT ce_other.chunk_id) AS shared_chunks,
{chunk_count_subquery} AS chunk_count,
{session_count_subquery} AS session_count,
{project_count_subquery} AS project_count,
{user_count_subquery} AS user_count
FROM entities e
JOIN chunk_entities ce_other ON ce_other.entity_id = e.id
JOIN chunk_entities ce_self ON ce_self.chunk_id = ce_other.chunk_id
{session_join}
WHERE ce_self.entity_id = ?1
AND e.id != ?1{session_where}{kind_clause}
GROUP BY e.id
ORDER BY shared_chunks DESC, e.value ASC, e.id ASC
LIMIT ?{limit_pos}"
);
let mut total_params: Vec<rusqlite::types::Value> = vec![source_id.to_string().into()];
if let Some(s) = session_str {
total_params.push(s.to_string().into());
}
if let Some(k) = &kind_str {
total_params.push(k.clone().into());
}
let total_param_refs: Vec<&dyn rusqlite::ToSql> = total_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let total_neighbors: i64 = conn
.query_row(&total_sql, total_param_refs.as_slice(), |row| row.get(0))
.with_context(|| format!("counting neighbors of {source_id}"))?;
let limit = opts.limit.unwrap_or(usize::MAX).min(i64::MAX as usize) as i64;
let mut list_params = total_params.clone();
list_params.push(limit.into());
let list_param_refs: Vec<&dyn rusqlite::ToSql> = list_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let mut stmt = conn
.prepare(&list_sql)
.context("preparing entity_neighbors query")?;
let rows = stmt
.query_map(list_param_refs.as_slice(), |row| {
let kind_raw: String = row.get(1)?;
let kind = entity_kind_from_str(&kind_raw).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(std::io::Error::other(e.to_string())),
)
})?;
Ok(EntityNeighbor {
id: row.get(0)?,
kind,
value: row.get(2)?,
edge_kind: EdgeKind::CoOccursWith,
shared_chunks: row.get(3)?,
chunk_count: row.get(4)?,
session_count: row.get(5)?,
project_count: row.get(6)?,
user_count: row.get(7)?,
chunks: None,
shared_session_ids: None,
shared_projects: None,
shared_users: None,
})
})
.context("running entity_neighbors query")?;
let mut neighbors = rows.collect::<Result<Vec<_>, _>>()?;
if let Some(per_neighbor_limit) = opts.with_chunks {
for neighbor in &mut neighbors {
neighbor.chunks = Some(load_shared_chunk_refs(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_sessions.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_session_ids = Some(load_shared_chunk_session_ids(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_projects.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_projects = Some(load_shared_chunk_projects(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_users.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_users = Some(load_shared_chunk_users(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
Ok(EntityNeighborsReport {
source_id: source_id.to_string(),
source_kind,
source_value,
neighbors,
total_neighbors,
kind_filter: opts.kind,
session_id: opts.session_id.clone(),
})
}
pub fn print_neighbors_text(report: &EntityNeighborsReport) {
let mut scope_parts: Vec<String> = Vec::new();
if let Some(k) = report.kind_filter {
scope_parts.push(format!("kind={}", k.as_str()));
}
if let Some(s) = &report.session_id {
scope_parts.push(format!("session_id={s}"));
}
let scope = if scope_parts.is_empty() {
String::new()
} else {
format!(" ({})", scope_parts.join(", "))
};
println!(
"{} ({}) — {} neighbor{}{}",
report.source_value,
report.source_kind.as_str(),
report.total_neighbors,
if report.total_neighbors == 1 { "" } else { "s" },
scope,
);
if report.neighbors.is_empty() {
println!(" (no co-occurring entities)");
return;
}
for n in &report.neighbors {
println!(
" {kind:<8} edge={edge:<16} shared={shared:<3} total={total:<4} sessions={sessions:<3} projects={projects:<3} users={users:<3} {value}",
kind = n.kind.as_str(),
edge = n.edge_kind.as_str(),
shared = n.shared_chunks,
total = n.chunk_count,
sessions = n.session_count,
projects = n.project_count,
users = n.user_count,
value = n.value,
);
if let Some(refs) = n.chunks.as_ref() {
for chunk in refs {
println!(
" - {chunk_id} [{uri}] {snippet}",
chunk_id = chunk.chunk_id,
uri = chunk.source_uri,
snippet = chunk.snippet,
);
}
}
if let Some(sessions) = n.shared_session_ids.as_ref() {
for session in sessions {
println!(" session={session}");
}
}
if let Some(projects) = n.shared_projects.as_ref() {
for project in projects {
println!(" project={project}");
}
}
if let Some(users) = n.shared_users.as_ref() {
for user in users {
println!(" user={user}");
}
}
}
}
pub fn print_neighbors_json(report: &EntityNeighborsReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct EntitySessionNeighborsOptions {
pub kind: Option<EntityKind>,
pub limit: Option<usize>,
pub session_id: Option<String>,
pub with_sessions: Option<usize>,
pub with_projects: Option<usize>,
pub with_users: Option<usize>,
pub with_chunks: Option<usize>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct EntitySessionNeighbor {
pub id: String,
pub kind: EntityKind,
pub value: String,
pub edge_kind: EdgeKind,
pub shared_sessions: i64,
pub session_count: i64,
pub project_count: i64,
pub user_count: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_session_ids: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_projects: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_users: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub chunks: Option<Vec<EntityChunkRef>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct EntitySessionNeighborsReport {
pub source_id: String,
pub source_kind: EntityKind,
pub source_value: String,
pub source_session_count: i64,
pub neighbors: Vec<EntitySessionNeighbor>,
pub total_neighbors: i64,
pub kind_filter: Option<EntityKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
}
pub fn entity_session_neighbors(
conn: &Connection,
source_id: &str,
opts: &EntitySessionNeighborsOptions,
) -> Result<EntitySessionNeighborsReport> {
let (source_kind_raw, source_value) = conn
.query_row(
"SELECT kind, value FROM entities WHERE id = ?1",
params![source_id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
)
.map_err(|err| match err {
rusqlite::Error::QueryReturnedNoRows => {
anyhow::anyhow!("no entity with id '{source_id}'")
}
other => anyhow::Error::from(other).context(format!("looking up entity {source_id}")),
})?;
let source_kind = entity_kind_from_str(&source_kind_raw)?;
let kind_str = opts.kind.map(|k| k.as_str().to_string());
let session_str = opts.session_id.as_deref();
let source_session_count: i64 = match session_str {
Some(s) => conn
.query_row(
"SELECT COUNT(DISTINCT c.session_id)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1 AND c.session_id = ?2",
params![source_id, s],
|row| row.get(0),
)
.with_context(|| format!("counting sessions for entity {source_id}"))?,
None => conn
.query_row(
"SELECT COUNT(DISTINCT c.session_id)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1 AND c.session_id IS NOT NULL",
params![source_id],
|row| row.get(0),
)
.with_context(|| format!("counting sessions for entity {source_id}"))?,
};
let session_pos: Option<usize> = session_str.map(|_| 2);
let kind_pos: Option<usize> = if kind_str.is_some() {
Some(if session_pos.is_some() { 3 } else { 2 })
} else {
None
};
let limit_pos: usize = match (session_pos.is_some(), kind_pos.is_some()) {
(true, true) => 4,
(true, false) => 3,
(false, true) => 3,
(false, false) => 2,
};
let kind_clause = match kind_pos {
Some(p) => format!(" AND e.kind = ?{p}"),
None => String::new(),
};
let shared_sessions_subquery = match session_pos {
Some(p) => format!(
"(SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1 AND cs.session_id = ?{p})"
),
None => "(SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1 AND cs.session_id IS NOT NULL)"
.to_string(),
};
let session_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.session_id)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p})"
),
None => "(SELECT COUNT(DISTINCT c2.session_id)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id IS NOT NULL)"
.to_string(),
};
let project_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.project)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p}
AND c2.project IS NOT NULL)"
),
None => "(SELECT COUNT(DISTINCT c2.project)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.project IS NOT NULL)"
.to_string(),
};
let user_count_subquery = match session_pos {
Some(p) => format!(
"(SELECT COUNT(DISTINCT c2.user)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.session_id = ?{p}
AND c2.user IS NOT NULL)"
),
None => "(SELECT COUNT(DISTINCT c2.user)
FROM chunk_entities ce2
JOIN chunks c2 ON c2.id = ce2.chunk_id
WHERE ce2.entity_id = e.id AND c2.user IS NOT NULL)"
.to_string(),
};
let session_where = match session_pos {
Some(p) => format!(" AND c.session_id = ?{p}"),
None => String::new(),
};
let total_sql = format!(
"SELECT COUNT(*) FROM (
SELECT e.id
FROM entities e
JOIN chunk_entities ce ON ce.entity_id = e.id
JOIN chunks c ON c.id = ce.chunk_id
WHERE e.id != ?1
AND c.session_id IS NOT NULL{session_where}
AND c.session_id IN {shared_sessions_subquery}{kind_clause}
GROUP BY e.id
)"
);
let list_sql = format!(
"SELECT e.id, e.kind, e.value,
COUNT(DISTINCT c.session_id) AS shared_sessions,
{session_count_subquery} AS session_count,
{project_count_subquery} AS project_count,
{user_count_subquery} AS user_count
FROM entities e
JOIN chunk_entities ce ON ce.entity_id = e.id
JOIN chunks c ON c.id = ce.chunk_id
WHERE e.id != ?1
AND c.session_id IS NOT NULL{session_where}
AND c.session_id IN {shared_sessions_subquery}{kind_clause}
GROUP BY e.id
ORDER BY shared_sessions DESC, e.value ASC, e.id ASC
LIMIT ?{limit_pos}"
);
let mut total_params: Vec<rusqlite::types::Value> = vec![source_id.to_string().into()];
if let Some(s) = session_str {
total_params.push(s.to_string().into());
}
if let Some(k) = &kind_str {
total_params.push(k.clone().into());
}
let total_param_refs: Vec<&dyn rusqlite::ToSql> = total_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let total_neighbors: i64 = conn
.query_row(&total_sql, total_param_refs.as_slice(), |row| row.get(0))
.with_context(|| format!("counting session-neighbors of {source_id}"))?;
let limit = opts.limit.unwrap_or(usize::MAX).min(i64::MAX as usize) as i64;
let mut list_params = total_params.clone();
list_params.push(limit.into());
let list_param_refs: Vec<&dyn rusqlite::ToSql> = list_params
.iter()
.map(|v| v as &dyn rusqlite::ToSql)
.collect();
let mut stmt = conn
.prepare(&list_sql)
.context("preparing entity_session_neighbors query")?;
let rows = stmt
.query_map(list_param_refs.as_slice(), |row| {
let kind_raw: String = row.get(1)?;
let kind = entity_kind_from_str(&kind_raw).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
1,
rusqlite::types::Type::Text,
Box::new(std::io::Error::other(e.to_string())),
)
})?;
Ok(EntitySessionNeighbor {
id: row.get(0)?,
kind,
value: row.get(2)?,
edge_kind: EdgeKind::SameSessionAs,
shared_sessions: row.get(3)?,
session_count: row.get(4)?,
project_count: row.get(5)?,
user_count: row.get(6)?,
shared_session_ids: None,
shared_projects: None,
shared_users: None,
chunks: None,
})
})
.context("running entity_session_neighbors query")?;
let mut neighbors = rows.collect::<Result<Vec<_>, _>>()?;
if let Some(per_neighbor_limit) = opts.with_sessions.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_session_ids = Some(load_shared_session_ids(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_projects.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_projects = Some(load_shared_session_projects(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_users.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.shared_users = Some(load_shared_session_users(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
if let Some(per_neighbor_limit) = opts.with_chunks.filter(|n| *n > 0) {
for neighbor in &mut neighbors {
neighbor.chunks = Some(load_shared_session_chunk_refs(
conn,
source_id,
&neighbor.id,
per_neighbor_limit,
session_str,
)?);
}
}
Ok(EntitySessionNeighborsReport {
source_id: source_id.to_string(),
source_kind,
source_value,
source_session_count,
neighbors,
total_neighbors,
kind_filter: opts.kind,
session_id: opts.session_id.clone(),
})
}
fn load_entity_projects(
conn: &Connection,
entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.session_id = ?2
AND c.project IS NOT NULL
ORDER BY c.project ASC
LIMIT ?3",
)
.context("preparing session-scoped entity-projects query")?;
let rows = stmt
.query_map(params![entity_id, session, limit], |row| {
row.get::<_, String>(0)
})
.context("running session-scoped entity-projects query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.project IS NOT NULL
ORDER BY c.project ASC
LIMIT ?2",
)
.context("preparing entity-projects query")?;
let rows = stmt
.query_map(params![entity_id, limit], |row| row.get::<_, String>(0))
.context("running entity-projects query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_entity_users(
conn: &Connection,
entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.session_id = ?2
AND c.user IS NOT NULL
ORDER BY c.user ASC
LIMIT ?3",
)
.context("preparing session-scoped entity-users query")?;
let rows = stmt
.query_map(params![entity_id, session, limit], |row| {
row.get::<_, String>(0)
})
.context("running session-scoped entity-users query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.user IS NOT NULL
ORDER BY c.user ASC
LIMIT ?2",
)
.context("preparing entity-users query")?;
let rows = stmt
.query_map(params![entity_id, limit], |row| row.get::<_, String>(0))
.context("running entity-users query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_entity_topics(
conn: &Connection,
entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.topic
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.session_id = ?2
AND c.topic IS NOT NULL
ORDER BY c.topic ASC
LIMIT ?3",
)
.context("preparing session-scoped entity-topics query")?;
let rows = stmt
.query_map(params![entity_id, session, limit], |row| {
row.get::<_, String>(0)
})
.context("running session-scoped entity-topics query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.topic
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.topic IS NOT NULL
ORDER BY c.topic ASC
LIMIT ?2",
)
.context("preparing entity-topics query")?;
let rows = stmt
.query_map(params![entity_id, limit], |row| row.get::<_, String>(0))
.context("running entity-topics query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_entity_session_ids(
conn: &Connection,
entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.session_id
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.session_id = ?2
ORDER BY c.session_id ASC
LIMIT ?3",
)
.context("preparing session-scoped entity-session-ids query")?;
let rows = stmt
.query_map(params![entity_id, session, limit], |row| {
row.get::<_, String>(0)
})
.context("running session-scoped entity-session-ids query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.session_id
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE ce.entity_id = ?1
AND c.session_id IS NOT NULL
ORDER BY c.session_id ASC
LIMIT ?2",
)
.context("preparing entity-session-ids query")?;
let rows = stmt
.query_map(params![entity_id, limit], |row| row.get::<_, String>(0))
.context("running entity-session-ids query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_session_ids(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1
AND cs.session_id = ?2
AND cs.session_id IN (
SELECT DISTINCT cn.session_id
FROM chunks cn
JOIN chunk_entities cen ON cen.chunk_id = cn.id
WHERE cen.entity_id = ?3
AND cn.session_id = ?2
)
ORDER BY cs.session_id ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-session-ids query")?;
let rows = stmt
.query_map(
params![source_entity_id, session, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-session-ids query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1
AND cs.session_id IS NOT NULL
AND cs.session_id IN (
SELECT DISTINCT cn.session_id
FROM chunks cn
JOIN chunk_entities cen ON cen.chunk_id = cn.id
WHERE cen.entity_id = ?2
AND cn.session_id IS NOT NULL
)
ORDER BY cs.session_id ASC
LIMIT ?3",
)
.context("preparing shared-session-ids query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-session-ids query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_session_projects(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id = ?3
AND c.project IS NOT NULL
ORDER BY c.project ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-session-projects query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-session-projects query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.project
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id IS NOT NULL
AND c.project IS NOT NULL
AND c.session_id IN (
SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1 AND cs.session_id IS NOT NULL
INTERSECT
SELECT DISTINCT cn.session_id
FROM chunks cn
JOIN chunk_entities cen ON cen.chunk_id = cn.id
WHERE cen.entity_id = ?2 AND cn.session_id IS NOT NULL
)
ORDER BY c.project ASC
LIMIT ?3",
)
.context("preparing shared-session-projects query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-session-projects query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_session_users(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<String>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id = ?3
AND c.user IS NOT NULL
ORDER BY c.user ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-session-users query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| row.get::<_, String>(0),
)
.context("running session-scoped shared-session-users query")?;
return rows.collect::<Result<Vec<_>, _>>().map_err(Into::into);
}
let mut stmt = conn
.prepare(
"SELECT DISTINCT c.user
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id IS NOT NULL
AND c.user IS NOT NULL
AND c.session_id IN (
SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1 AND cs.session_id IS NOT NULL
INTERSECT
SELECT DISTINCT cn.session_id
FROM chunks cn
JOIN chunk_entities cen ON cen.chunk_id = cn.id
WHERE cen.entity_id = ?2 AND cn.session_id IS NOT NULL
)
ORDER BY c.user ASC
LIMIT ?3",
)
.context("preparing shared-session-users query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| row.get::<_, String>(0),
)
.context("running shared-session-users query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
fn load_shared_session_chunk_refs(
conn: &Connection,
source_entity_id: &str,
neighbor_entity_id: &str,
limit: usize,
session_filter: Option<&str>,
) -> Result<Vec<EntityChunkRef>> {
if limit == 0 {
return Ok(Vec::new());
}
let limit = limit.min(i64::MAX as usize) as i64;
if let Some(session) = session_filter {
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunks c
JOIN sources s ON s.id = c.source_id
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id = ?3
GROUP BY c.id
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?4",
)
.context("preparing session-scoped shared-session-chunks query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, session, limit],
|row| build_chunk_ref(row),
)
.context("running session-scoped shared-session-chunks query")?;
return Ok(rows.collect::<Result<Vec<_>, _>>()?);
}
let mut stmt = conn
.prepare(
"SELECT c.id, c.source_id, s.uri, c.text
FROM chunks c
JOIN sources s ON s.id = c.source_id
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (?1, ?2)
AND c.session_id IS NOT NULL
AND c.session_id IN (
SELECT DISTINCT cs.session_id
FROM chunks cs
JOIN chunk_entities ces ON ces.chunk_id = cs.id
WHERE ces.entity_id = ?1 AND cs.session_id IS NOT NULL
INTERSECT
SELECT DISTINCT cn.session_id
FROM chunks cn
JOIN chunk_entities cen ON cen.chunk_id = cn.id
WHERE cen.entity_id = ?2 AND cn.session_id IS NOT NULL
)
GROUP BY c.id
ORDER BY c.created_at ASC, c.id ASC
LIMIT ?3",
)
.context("preparing shared-session-chunks query")?;
let rows = stmt
.query_map(
params![source_entity_id, neighbor_entity_id, limit],
|row| build_chunk_ref(row),
)
.context("running shared-session-chunks query")?;
Ok(rows.collect::<Result<Vec<_>, _>>()?)
}
pub fn print_session_neighbors_text(report: &EntitySessionNeighborsReport) {
let mut scope_parts: Vec<String> = Vec::new();
if let Some(k) = report.kind_filter {
scope_parts.push(format!("kind={}", k.as_str()));
}
if let Some(s) = &report.session_id {
scope_parts.push(format!("session_id={s}"));
}
let scope = if scope_parts.is_empty() {
String::new()
} else {
format!(" ({})", scope_parts.join(", "))
};
println!(
"{} ({}) — {} session-neighbor{}, source spans {} session{}{scope}",
report.source_value,
report.source_kind.as_str(),
report.total_neighbors,
if report.total_neighbors == 1 { "" } else { "s" },
report.source_session_count,
if report.source_session_count == 1 {
""
} else {
"s"
},
);
if report.neighbors.is_empty() {
println!(" (no session-co-occurring entities)");
return;
}
for n in &report.neighbors {
println!(
" {kind:<8} edge={edge:<16} shared={shared:<3} total={total:<4} projects={projects:<3} users={users:<3} {value}",
kind = n.kind.as_str(),
edge = n.edge_kind.as_str(),
shared = n.shared_sessions,
total = n.session_count,
projects = n.project_count,
users = n.user_count,
value = n.value,
);
if let Some(sessions) = &n.shared_session_ids {
for session in sessions {
println!(" session={session}");
}
}
if let Some(projects) = &n.shared_projects {
for project in projects {
println!(" project={project}");
}
}
if let Some(users) = &n.shared_users {
for user in users {
println!(" user={user}");
}
}
if let Some(refs) = n.chunks.as_ref() {
for chunk in refs {
println!(
" - {chunk_id} [{uri}] {snippet}",
chunk_id = chunk.chunk_id,
uri = chunk.source_uri,
snippet = chunk.snippet,
);
}
}
}
}
pub fn print_session_neighbors_json(report: &EntitySessionNeighborsReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
pub fn print_text(report: &EntityListReport) {
if report.entries.is_empty() {
println!("no entities matched");
if let Some(kind) = report.kind_filter {
println!(" kind: {}", kind.as_str());
}
if let Some(v) = &report.value_contains {
println!(" value contains: {v}");
}
if let Some(session) = &report.session_id {
println!(" session_id: {session}");
}
return;
}
println!(
"{} entit{} ({} total matched){scope}",
report.entries.len(),
if report.entries.len() == 1 {
"y"
} else {
"ies"
},
report.total_matched,
scope = match &report.session_id {
Some(s) => format!(" (session_id={s})"),
None => String::new(),
},
);
for entry in &report.entries {
println!(
" {kind:<8} chunks={chunks:<4} sessions={sessions:<3} projects={projects:<3} users={users:<3} topics={topics:<3} {value}",
kind = entry.kind.as_str(),
chunks = entry.chunk_count,
sessions = entry.session_count,
projects = entry.project_count,
users = entry.user_count,
topics = entry.topic_count,
value = entry.value,
);
if let Some(sessions) = entry.session_ids.as_ref() {
for session in sessions {
println!(" session={session}");
}
}
if let Some(projects) = entry.projects.as_ref() {
for project in projects {
println!(" project={project}");
}
}
if let Some(users) = entry.users.as_ref() {
for user in users {
println!(" user={user}");
}
}
if let Some(topics) = entry.topics.as_ref() {
for topic in topics {
println!(" topic={topic}");
}
}
if let Some(refs) = entry.chunks.as_ref() {
for chunk in refs {
println!(
" - {chunk_id} [{uri}] {snippet}",
chunk_id = chunk.chunk_id,
uri = chunk.source_uri,
snippet = chunk.snippet,
);
}
}
}
}
pub fn print_json(report: &EntityListReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
pub fn record_chunk_entities(
conn: &Connection,
chunk_id: &str,
entities: &[Entity],
now: i64,
) -> Result<usize> {
if entities.is_empty() {
return Ok(0);
}
let mut up_stmt = conn
.prepare_cached(
"INSERT OR IGNORE INTO entities (id, kind, value, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.context("preparing entity upsert")?;
let mut link_stmt = conn
.prepare_cached(
"INSERT OR IGNORE INTO chunk_entities (chunk_id, entity_id) VALUES (?1, ?2)",
)
.context("preparing chunk_entities link")?;
let mut linked = 0usize;
for entity in entities {
let id = entity_id(entity.kind, &entity.value);
up_stmt
.execute(params![id, entity.kind.as_str(), entity.value, now])
.with_context(|| {
format!("upserting entity {}={}", entity.kind.as_str(), entity.value)
})?;
let inserted = link_stmt
.execute(params![chunk_id, id])
.with_context(|| format!("linking chunk {chunk_id} to entity {id}"))?;
linked += inserted;
}
Ok(linked)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::Store;
use tempfile::tempdir;
#[test]
fn extracts_basic_https_url() {
let entities = extract_entities("see https://example.com for details");
let urls: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Url)
.map(|e| e.value.as_str())
.collect();
assert_eq!(urls, vec!["https://example.com"]);
}
#[test]
fn extracts_both_schemes() {
let entities = extract_entities("http://a.test and https://b.test/path");
let urls: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Url)
.map(|e| e.value.as_str())
.collect();
assert!(urls.contains(&"http://a.test"));
assert!(urls.contains(&"https://b.test/path"));
}
#[test]
fn extracts_repository_slugs_from_repository_urls() {
let entities = extract_entities(
"see https://github.com/diogenes/lantern and https://git.skylantix.com/diogenes/lantern/",
);
let repos: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Repo)
.map(|e| e.value.as_str())
.collect();
assert_eq!(repos, vec!["diogenes/lantern"]);
}
#[test]
fn extracts_domain_from_basic_url() {
let entities = extract_entities("see https://example.com/page for details");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["example.com"]);
}
#[test]
fn url_domain_lowercases_the_host() {
let entities = extract_entities("visit https://Example.COM/path");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["example.com"]);
}
#[test]
fn url_domain_strips_port_and_userinfo() {
let entities =
extract_entities("ping https://user:pass@api.example.com:8080/v1/probe please");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["api.example.com"]);
}
#[test]
fn url_domain_dedupes_across_paths_and_schemes() {
let entities = extract_entities(
"compare http://example.com/a, https://example.com/b, https://example.com/c",
);
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["example.com"]);
}
#[test]
fn extracts_domain_from_email() {
let entities = extract_entities("reach me at alice@example.com today");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["example.com"]);
}
#[test]
fn url_and_email_domains_dedupe_on_same_host() {
let entities = extract_entities("see https://example.com/page or email me@example.com");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["example.com"]);
}
#[test]
fn ipv4_host_extracts_as_domain() {
let entities = extract_entities("local server http://192.168.1.10:8080/health");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(domains, vec!["192.168.1.10"]);
}
#[test]
fn ipv6_bracket_host_does_not_extract_as_domain() {
let entities = extract_entities("ping https://[::1]:8080/health");
let urls: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Url)
.map(|e| e.value.as_str())
.collect();
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert_eq!(urls, vec!["https://[::1]:8080/health"]);
assert!(
domains.is_empty(),
"ipv6 literal should not produce a domain entity, got: {domains:?}"
);
}
#[test]
fn single_label_host_does_not_extract_as_domain() {
let entities = extract_entities("hit http://localhost:3000/health for status");
let domains: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Domain)
.map(|e| e.value.as_str())
.collect();
assert!(
domains.is_empty(),
"single-label host should not produce a domain entity, got: {domains:?}"
);
}
#[test]
fn trims_trailing_sentence_punctuation() {
let entities = extract_entities("visit https://example.com/page, or https://other.test.");
let urls: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Url)
.map(|e| e.value.as_str())
.collect();
assert_eq!(urls, vec!["https://example.com/page", "https://other.test"]);
}
#[test]
fn dedups_within_same_text() {
let entities =
extract_entities("https://x.test and again https://x.test plus https://y.test");
let urls: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Url)
.map(|e| e.value.as_str())
.collect();
assert_eq!(urls, vec!["https://x.test", "https://y.test"]);
}
#[test]
fn ignores_bare_scheme_without_host() {
let entities = extract_entities("not a url: http:// or https:// alone");
assert!(entities.is_empty());
}
#[test]
fn extracts_emails_and_trims_punctuation() {
let entities =
extract_entities("reach me at alice@example.com, or bob.smith+test@sub.example.co.uk.");
let emails: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Email)
.map(|e| e.value.as_str())
.collect();
assert_eq!(
emails,
vec!["alice@example.com", "bob.smith+test@sub.example.co.uk",]
);
}
#[test]
fn extracts_backtick_wrapped_file_paths() {
let entities = extract_entities(
"look at `src/main.rs`, `Cargo.toml`, and `Vec` while ignoring `https://example.com`",
);
let values: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::FilePath)
.map(|e| e.value.as_str())
.collect();
assert_eq!(values, vec!["src/main.rs", "Cargo.toml"]);
}
#[test]
fn extracts_plain_file_paths_with_relative_or_absolute_prefixes() {
let entities = extract_entities(
"check ./src/main.rs, /tmp/example.log, and ../notes/todo.md but not issue/123 or https://example.com/src/main.rs",
);
let values: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::FilePath)
.map(|e| e.value.as_str())
.collect();
assert_eq!(
values,
vec!["./src/main.rs", "/tmp/example.log", "../notes/todo.md"]
);
}
#[test]
fn entity_id_is_deterministic_and_kind_scoped() {
let a = entity_id(EntityKind::Url, "https://example.com");
let b = entity_id(EntityKind::Url, "https://example.com");
let repo = entity_id(EntityKind::Repo, "https://example.com");
let domain = entity_id(EntityKind::Domain, "https://example.com");
let email = entity_id(EntityKind::Email, "https://example.com");
let path = entity_id(EntityKind::FilePath, "https://example.com");
let mention = entity_id(EntityKind::Mention, "https://example.com");
let hashtag = entity_id(EntityKind::Hashtag, "https://example.com");
assert_eq!(a, b);
assert_ne!(a, repo);
assert_ne!(a, domain);
assert_ne!(a, email);
assert_ne!(a, path);
assert_ne!(a, mention);
assert_ne!(a, hashtag);
assert_ne!(email, mention);
assert_ne!(domain, repo);
assert_eq!(a.len(), 32);
}
#[test]
fn extracts_basic_mentions() {
let entities = extract_entities("Hi @alice and @bob_smith — see also @charlie.");
let values: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(values, vec!["@alice", "@bob_smith", "@charlie"]);
}
#[test]
fn mention_at_start_of_text() {
let entities = extract_entities("@alice says hi");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@alice"]);
}
#[test]
fn email_does_not_double_extract_as_mention() {
let entities = extract_entities("ping me@example.com");
let mention_count = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.count();
assert_eq!(mention_count, 0);
let emails: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Email)
.map(|e| e.value.as_str())
.collect();
assert_eq!(emails, vec!["me@example.com"]);
}
#[test]
fn mention_and_email_can_coexist() {
let entities = extract_entities("@alice emailed bob@example.com today");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
let emails: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Email)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@alice"]);
assert_eq!(emails, vec!["bob@example.com"]);
}
#[test]
fn mention_dedupes_within_text() {
let entities = extract_entities("@bob said hi, then @bob left, finally @carol");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@bob", "@carol"]);
}
#[test]
fn mention_trims_trailing_separators() {
let entities = extract_entities("Ping @charlie! Then @dana. And @eve-");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@charlie", "@dana", "@eve"]);
}
#[test]
fn mention_skips_short_or_empty_handles() {
let entities = extract_entities("@ alone, @x too short, but @ab is fine");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@ab"]);
}
#[test]
fn mention_skips_pure_digit_or_date_handles() {
let entities = extract_entities("see @2024-01-15 or @1.2.3 vs @v1.2.3");
let mentions: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Mention)
.map(|e| e.value.as_str())
.collect();
assert_eq!(mentions, vec!["@v1.2.3"]);
}
#[test]
fn extracts_hashtags_and_ignores_url_fragments() {
let entities = extract_entities(
"follow #Lantern and #rust-lang, but keep https://example.com/#anchor and foo#bar as-is",
);
let tags: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Hashtag)
.map(|e| e.value.as_str())
.collect();
assert_eq!(tags, vec!["#Lantern", "#rust-lang"]);
}
#[test]
fn hashtag_skips_short_or_digit_only_tags() {
let entities = extract_entities("# alone, #x too short, #2024 no, but #v1 is fine");
let tags: Vec<_> = entities
.iter()
.filter(|e| e.kind == EntityKind::Hashtag)
.map(|e| e.value.as_str())
.collect();
assert_eq!(tags, vec!["#v1"]);
}
#[test]
fn record_chunk_entities_dedups_across_chunks() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('src1', 'mem://t', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (id, ord) in [("c1", 0), ("c2", 1)] {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, created_at)
VALUES (?1, 'src1', ?2, 0, 0, 0, '', '', 0)",
params![id, ord],
)
.unwrap();
}
let shared = vec![Entity {
kind: EntityKind::Url,
value: "https://shared.test".into(),
}];
record_chunk_entities(&tx, "c1", &shared, 100).unwrap();
record_chunk_entities(&tx, "c2", &shared, 200).unwrap();
tx.commit().unwrap();
let entity_count: i64 = store
.conn()
.query_row("SELECT COUNT(*) FROM entities", [], |row| row.get(0))
.unwrap();
assert_eq!(
entity_count, 1,
"shared URL must collapse to a single entity"
);
let edges: i64 = store
.conn()
.query_row("SELECT COUNT(*) FROM chunk_entities", [], |row| row.get(0))
.unwrap();
assert_eq!(edges, 2, "both chunks should link to the shared entity");
}
fn seed_chunks(store: &mut Store, ids: &[&str]) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, id) in ids.iter().enumerate() {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', 0)",
params![id, ord as i64],
)
.unwrap();
}
tx.commit().unwrap();
}
fn seed_chunks_with_sessions(store: &mut Store, rows: &[(&str, Option<&str>)]) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session)) in rows.iter().enumerate() {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, 0)",
params![id, ord as i64, session],
)
.unwrap();
}
tx.commit().unwrap();
}
fn seed_chunks_with_projects(store: &mut Store, rows: &[(&str, Option<&str>)]) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, project)) in rows.iter().enumerate() {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, 0)",
params![id, ord as i64, project],
)
.unwrap();
}
tx.commit().unwrap();
}
fn seed_chunks_with_users(store: &mut Store, rows: &[(&str, Option<&str>)]) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, user)) in rows.iter().enumerate() {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, 0)",
params![id, ord as i64, user],
)
.unwrap();
}
tx.commit().unwrap();
}
fn seed_chunks_with_topics(store: &mut Store, rows: &[(&str, Option<&str>)]) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, topic)) in rows.iter().enumerate() {
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, topic, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, 0)",
params![id, ord as i64, topic],
)
.unwrap();
}
tx.commit().unwrap();
}
fn link(store: &mut Store, chunk_id: &str, entities: &[Entity], now: i64) {
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
record_chunk_entities(&tx, chunk_id, entities, now).unwrap();
tx.commit().unwrap();
}
#[test]
fn list_entities_orders_by_chunk_count_desc() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2", "c3"]);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
let lonely = Entity {
kind: EntityKind::Url,
value: "https://lonely.test".into(),
};
link(&mut store, "c1", &[popular.clone(), lonely.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.total_matched, 2);
assert_eq!(report.entries.len(), 2);
assert_eq!(report.entries[0].value, "https://popular.test");
assert_eq!(report.entries[0].chunk_count, 3);
assert_eq!(report.entries[1].value, "https://lonely.test");
assert_eq!(report.entries[1].chunk_count, 1);
}
#[test]
fn list_entities_filters_by_kind() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1"]);
link(
&mut store,
"c1",
&[
Entity {
kind: EntityKind::Url,
value: "https://x.test".into(),
},
Entity {
kind: EntityKind::Email,
value: "alice@x.test".into(),
},
Entity {
kind: EntityKind::Mention,
value: "@bob".into(),
},
],
100,
);
let opts = EntityListOptions {
kind: Some(EntityKind::Email),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.total_matched, 1);
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].kind, EntityKind::Email);
assert_eq!(report.entries[0].value, "alice@x.test");
}
#[test]
fn list_entities_value_contains_treats_percent_as_literal() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
link(
&mut store,
"c1",
&[Entity {
kind: EntityKind::Url,
value: "https://example.test/a%20b".into(),
}],
100,
);
link(
&mut store,
"c2",
&[Entity {
kind: EntityKind::Url,
value: "https://other.test/plain".into(),
}],
200,
);
let opts = EntityListOptions {
value_contains: Some("%20".into()),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.total_matched, 1);
assert_eq!(report.entries[0].value, "https://example.test/a%20b");
}
#[test]
fn list_entities_limit_truncates_but_total_matched_is_full() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1"]);
let many = (0..5)
.map(|i| Entity {
kind: EntityKind::Url,
value: format!("https://e{i}.test"),
})
.collect::<Vec<_>>();
link(&mut store, "c1", &many, 100);
let opts = EntityListOptions {
limit: Some(2),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.entries.len(), 2);
assert_eq!(report.total_matched, 5);
}
#[test]
fn list_entities_with_chunks_includes_chunk_refs() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2", "c3"]);
{
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"UPDATE chunks
SET text = CASE id
WHEN 'c1' THEN 'first reference chunk text'
WHEN 'c2' THEN 'second reference chunk text'
WHEN 'c3' THEN 'third reference chunk text'
END
WHERE id IN ('c1', 'c2', 'c3')",
[],
)
.unwrap();
tx.commit().unwrap();
}
let entity = Entity {
kind: EntityKind::Url,
value: "https://graph.test/edge".into(),
};
link(&mut store, "c2", &[entity.clone()], 100);
link(&mut store, "c1", &[entity], 200);
let opts = EntityListOptions {
with_chunks: Some(2),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.total_matched, 1);
let entry = &report.entries[0];
let refs = entry
.chunks
.as_ref()
.expect("chunk refs should be included");
assert_eq!(refs.len(), 2);
assert_eq!(refs[0].chunk_id, "c1");
assert_eq!(refs[0].source_uri, "mem://list");
assert_eq!(refs[0].snippet, "first reference chunk text");
assert_eq!(refs[1].chunk_id, "c2");
assert_eq!(refs[1].snippet, "second reference chunk text");
}
#[test]
fn list_entities_session_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s2"))]);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", &[mention], 200);
let default_report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(default_report.entries.len(), 1);
assert!(
default_report.entries[0].session_ids.is_none(),
"session evidence must be omitted by default"
);
let zeroed_report = list_entities(
store.conn(),
&EntityListOptions {
with_sessions: Some(0),
..Default::default()
},
)
.unwrap();
assert!(
zeroed_report.entries[0].session_ids.is_none(),
"with_sessions=0 must collapse to the cheap default path"
);
}
#[test]
fn list_entities_with_sessions_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s3")),
("c2", Some("s1")),
("c3", Some("s2")),
("c4", None),
],
);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
with_sessions: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(
report.entries[0]
.session_ids
.as_ref()
.expect("opt-in session evidence should be present"),
&vec!["s1".to_string(), "s2".to_string()]
);
}
#[test]
fn list_entities_with_sessions_collapses_under_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s2"))]);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", &[mention], 200);
let report = list_entities(
store.conn(),
&EntityListOptions {
session_id: Some("s2".into()),
with_sessions: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s2"));
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0]
.session_ids
.as_ref()
.expect("session-scoped evidence should be present"),
&vec!["s2".to_string()]
);
}
#[test]
fn list_entities_project_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(&mut store, &[("c1", Some("alpha")), ("c2", Some("beta"))]);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", &[mention], 200);
let default_report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(default_report.entries.len(), 1);
assert!(
default_report.entries[0].projects.is_none(),
"project evidence must be omitted by default"
);
let zeroed_report = list_entities(
store.conn(),
&EntityListOptions {
with_projects: Some(0),
..Default::default()
},
)
.unwrap();
assert!(
zeroed_report.entries[0].projects.is_none(),
"with_projects=0 must collapse to the cheap default path"
);
}
#[test]
fn list_entities_with_projects_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(
&mut store,
&[
("c1", Some("gamma")),
("c2", Some("alpha")),
("c3", Some("beta")),
("c4", None),
],
);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
with_projects: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(
report.entries[0]
.projects
.as_ref()
.expect("opt-in project evidence should be present"),
&vec!["alpha".to_string(), "beta".to_string()]
);
}
#[test]
fn list_entities_with_projects_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", "s1", Some("alpha")),
("c2", "s1", Some("beta")),
("c3", "s2", Some("gamma")),
("c4", "s1", None),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
session_id: Some("s1".into()),
with_projects: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0]
.projects
.as_ref()
.expect("session-scoped project evidence should be present"),
&vec!["alpha".to_string(), "beta".to_string()]
);
}
#[test]
fn list_entities_user_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(&mut store, &[("c1", Some("alice")), ("c2", Some("bob"))]);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", &[mention], 200);
let default_report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(default_report.entries.len(), 1);
assert!(
default_report.entries[0].users.is_none(),
"user evidence must be omitted by default"
);
let zeroed_report = list_entities(
store.conn(),
&EntityListOptions {
with_users: Some(0),
..Default::default()
},
)
.unwrap();
assert!(
zeroed_report.entries[0].users.is_none(),
"with_users=0 must collapse to the cheap default path"
);
}
#[test]
fn list_entities_with_users_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(
&mut store,
&[
("c1", Some("zoe")),
("c2", Some("alice")),
("c3", Some("bob")),
("c4", None),
],
);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
with_users: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(
report.entries[0]
.users
.as_ref()
.expect("opt-in user evidence should be present"),
&vec!["alice".to_string(), "bob".to_string()]
);
}
#[test]
fn list_entities_with_users_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", "s1", Some("alice")),
("c2", "s1", Some("bob")),
("c3", "s2", Some("carol")),
("c4", "s1", None),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
session_id: Some("s1".into()),
with_users: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0]
.users
.as_ref()
.expect("session-scoped user evidence should be present"),
&vec!["alice".to_string(), "bob".to_string()]
);
}
#[test]
fn list_entities_topic_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_topics(&mut store, &[("c1", Some("alpha")), ("c2", Some("beta"))]);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", &[mention], 200);
let default_report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(default_report.entries.len(), 1);
assert!(
default_report.entries[0].topics.is_none(),
"topic evidence must be omitted by default"
);
let zeroed_report = list_entities(
store.conn(),
&EntityListOptions {
with_topics: Some(0),
..Default::default()
},
)
.unwrap();
assert!(
zeroed_report.entries[0].topics.is_none(),
"with_topics=0 must collapse to the cheap default path"
);
}
#[test]
fn list_entities_with_topics_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_topics(
&mut store,
&[
("c1", Some("zeta")),
("c2", Some("alpha")),
("c3", Some("beta")),
("c4", None),
],
);
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
with_topics: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(
report.entries[0]
.topics
.as_ref()
.expect("opt-in topic evidence should be present"),
&vec!["alpha".to_string(), "beta".to_string()]
);
}
#[test]
fn list_entities_with_topics_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, topic)) in [
("c1", "s1", Some("alpha")),
("c2", "s1", Some("beta")),
("c3", "s2", Some("gamma")),
("c4", "s1", None),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, topic, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, topic],
)
.unwrap();
}
tx.commit().unwrap();
let mention = Entity {
kind: EntityKind::Mention,
value: "@topic".into(),
};
link(&mut store, "c1", std::slice::from_ref(&mention), 100);
link(&mut store, "c2", std::slice::from_ref(&mention), 200);
link(&mut store, "c3", std::slice::from_ref(&mention), 300);
link(&mut store, "c4", &[mention], 400);
let report = list_entities(
store.conn(),
&EntityListOptions {
session_id: Some("s1".into()),
with_topics: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0]
.topics
.as_ref()
.expect("session-scoped topic evidence should be present"),
&vec!["alpha".to_string(), "beta".to_string()]
);
}
#[test]
fn list_entities_empty_store_returns_empty_report() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert!(report.entries.is_empty());
assert_eq!(report.total_matched, 0);
}
#[test]
fn list_entities_session_count_is_zero_for_chunks_without_session_id() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let url = Entity {
kind: EntityKind::Url,
value: "https://untagged.test".into(),
};
link(&mut store, "c1", &[url.clone()], 100);
link(&mut store, "c2", &[url], 200);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].chunk_count, 2);
assert_eq!(report.entries[0].session_count, 0);
}
#[test]
fn list_entities_session_count_counts_distinct_sessions() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("a")),
("c2", Some("a")),
("c3", Some("b")),
("c4", None),
],
);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
let narrow = Entity {
kind: EntityKind::Url,
value: "https://narrow.test".into(),
};
link(&mut store, "c1", &[popular.clone(), narrow.clone()], 100);
link(&mut store, "c2", &[popular.clone(), narrow.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 2);
assert_eq!(report.entries[0].value, "https://popular.test");
assert_eq!(report.entries[0].chunk_count, 4);
assert_eq!(
report.entries[0].session_count, 2,
"popular spans sessions a + b; the untagged chunk must not count",
);
assert_eq!(report.entries[1].value, "https://narrow.test");
assert_eq!(report.entries[1].chunk_count, 2);
assert_eq!(report.entries[1].session_count, 1);
}
#[test]
fn list_entities_project_count_is_zero_for_chunks_without_project() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let url = Entity {
kind: EntityKind::Url,
value: "https://untagged.test".into(),
};
link(&mut store, "c1", &[url.clone()], 100);
link(&mut store, "c2", &[url], 200);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].chunk_count, 2);
assert_eq!(report.entries[0].project_count, 0);
}
#[test]
fn list_entities_project_count_counts_distinct_projects() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(
&mut store,
&[
("c1", Some("alpha")),
("c2", Some("alpha")),
("c3", Some("beta")),
("c4", None),
],
);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
let narrow = Entity {
kind: EntityKind::Url,
value: "https://narrow.test".into(),
};
link(&mut store, "c1", &[popular.clone(), narrow.clone()], 100);
link(&mut store, "c2", &[popular.clone(), narrow.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 2);
assert_eq!(report.entries[0].value, "https://popular.test");
assert_eq!(report.entries[0].chunk_count, 4);
assert_eq!(
report.entries[0].project_count, 2,
"popular spans projects alpha + beta; the untagged chunk must not count",
);
assert_eq!(report.entries[1].value, "https://narrow.test");
assert_eq!(report.entries[1].chunk_count, 2);
assert_eq!(report.entries[1].project_count, 1);
}
#[test]
fn list_entities_project_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, project)) in [("c1", "alpha"), ("c2", "alpha"), ("c3", "beta")]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', 's1', ?3, 0)",
params![id, ord as i64, project],
)
.unwrap();
}
tx.commit().unwrap();
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
link(&mut store, "c1", &[popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let opts = EntityListOptions {
session_id: Some("s1".into()),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0].project_count, 2,
"project grouping must not collapse under a session_id filter",
);
}
#[test]
fn list_entities_user_count_is_zero_for_chunks_without_user() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let url = Entity {
kind: EntityKind::Url,
value: "https://untagged.test".into(),
};
link(&mut store, "c1", &[url.clone()], 100);
link(&mut store, "c2", &[url], 200);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].chunk_count, 2);
assert_eq!(report.entries[0].user_count, 0);
}
#[test]
fn list_entities_user_count_counts_distinct_users() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(
&mut store,
&[
("c1", Some("alice")),
("c2", Some("alice")),
("c3", Some("bob")),
("c4", None),
],
);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
let narrow = Entity {
kind: EntityKind::Url,
value: "https://narrow.test".into(),
};
link(&mut store, "c1", &[popular.clone(), narrow.clone()], 100);
link(&mut store, "c2", &[popular.clone(), narrow.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 2);
assert_eq!(report.entries[0].value, "https://popular.test");
assert_eq!(report.entries[0].chunk_count, 4);
assert_eq!(
report.entries[0].user_count, 2,
"popular spans users alice + bob; the untagged chunk must not count",
);
assert_eq!(report.entries[1].value, "https://narrow.test");
assert_eq!(report.entries[1].chunk_count, 2);
assert_eq!(report.entries[1].user_count, 1);
}
#[test]
fn list_entities_user_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, user)) in [("c1", "alice"), ("c2", "alice"), ("c3", "bob")]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', 's1', ?3, 0)",
params![id, ord as i64, user],
)
.unwrap();
}
tx.commit().unwrap();
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
link(&mut store, "c1", &[popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let opts = EntityListOptions {
session_id: Some("s1".into()),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0].user_count, 2,
"user grouping must not collapse under a session_id filter",
);
}
#[test]
fn list_entities_topic_count_is_zero_for_chunks_without_topic() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let url = Entity {
kind: EntityKind::Url,
value: "https://untagged.test".into(),
};
link(&mut store, "c1", &[url.clone()], 100);
link(&mut store, "c2", &[url], 200);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].chunk_count, 2);
assert_eq!(report.entries[0].topic_count, 0);
}
#[test]
fn list_entities_topic_count_counts_distinct_topics() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_topics(
&mut store,
&[
("c1", Some("alpha")),
("c2", Some("alpha")),
("c3", Some("beta")),
("c4", None),
],
);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
let narrow = Entity {
kind: EntityKind::Url,
value: "https://narrow.test".into(),
};
link(&mut store, "c1", &[popular.clone(), narrow.clone()], 100);
link(&mut store, "c2", &[popular.clone(), narrow.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let report = list_entities(store.conn(), &EntityListOptions::default()).unwrap();
assert_eq!(report.entries.len(), 2);
assert_eq!(report.entries[0].value, "https://popular.test");
assert_eq!(report.entries[0].chunk_count, 4);
assert_eq!(
report.entries[0].topic_count, 2,
"popular spans topics alpha + beta; the untagged chunk must not count",
);
assert_eq!(report.entries[1].value, "https://narrow.test");
assert_eq!(report.entries[1].chunk_count, 2);
assert_eq!(report.entries[1].topic_count, 1);
}
#[test]
fn list_entities_topic_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, topic)) in [("c1", "alpha"), ("c2", "alpha"), ("c3", "beta")]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, topic, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', 's1', ?3, 0)",
params![id, ord as i64, topic],
)
.unwrap();
}
tx.commit().unwrap();
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
link(&mut store, "c1", &[popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let opts = EntityListOptions {
session_id: Some("s1".into()),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].session_count, 1);
assert_eq!(
report.entries[0].topic_count, 2,
"topic grouping must not collapse under a session_id filter",
);
}
#[test]
fn list_entities_session_count_is_one_under_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[("c1", Some("a")), ("c2", Some("a")), ("c3", Some("b"))],
);
let popular = Entity {
kind: EntityKind::Url,
value: "https://popular.test".into(),
};
link(&mut store, "c1", &[popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let opts = EntityListOptions {
session_id: Some("a".into()),
..Default::default()
};
let report = list_entities(store.conn(), &opts).unwrap();
assert_eq!(report.entries.len(), 1);
assert_eq!(report.entries[0].chunk_count, 2);
assert_eq!(report.entries[0].session_count, 1);
}
#[test]
fn entity_neighbors_ranks_by_shared_chunk_count() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2", "c3"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let frequent = Entity {
kind: EntityKind::Mention,
value: "@frequent".into(),
};
let sometimes = Entity {
kind: EntityKind::Mention,
value: "@sometimes".into(),
};
let unrelated = Entity {
kind: EntityKind::Url,
value: "https://unrelated.test".into(),
};
link(
&mut store,
"c1",
&[topic.clone(), frequent.clone(), sometimes.clone()],
100,
);
link(&mut store, "c2", &[topic.clone(), frequent.clone()], 200);
link(&mut store, "c3", &[unrelated], 300);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.source_value, "#topic");
assert_eq!(report.total_neighbors, 2);
assert_eq!(report.neighbors.len(), 2);
assert_eq!(report.neighbors[0].value, "@frequent");
assert_eq!(report.neighbors[0].edge_kind, EdgeKind::CoOccursWith);
assert_eq!(report.neighbors[0].shared_chunks, 2);
assert_eq!(report.neighbors[1].value, "@sometimes");
assert_eq!(report.neighbors[1].edge_kind, EdgeKind::CoOccursWith);
assert_eq!(report.neighbors[1].shared_chunks, 1);
assert!(report.neighbors.iter().all(|n| n.id != topic_id));
}
#[test]
fn entity_neighbors_filters_by_kind() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let mention = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
let url = Entity {
kind: EntityKind::Url,
value: "https://example.test/x".into(),
};
link(
&mut store,
"c1",
&[topic.clone(), mention.clone(), url.clone()],
100,
);
let topic_id = entity_id(topic.kind, &topic.value);
let opts = EntityNeighborsOptions {
kind: Some(EntityKind::Url),
..Default::default()
};
let report = entity_neighbors(store.conn(), &topic_id, &opts).unwrap();
assert_eq!(report.total_neighbors, 1);
assert_eq!(report.neighbors.len(), 1);
assert_eq!(report.neighbors[0].kind, EntityKind::Url);
assert_eq!(report.neighbors[0].edge_kind, EdgeKind::CoOccursWith);
assert_eq!(report.neighbors[0].value, "https://example.test/x");
}
#[test]
fn entity_neighbors_limit_truncates_but_total_is_full() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1"]);
let hub = Entity {
kind: EntityKind::Hashtag,
value: "#hub".into(),
};
let mut linked = vec![hub.clone()];
for i in 0..4 {
linked.push(Entity {
kind: EntityKind::Url,
value: format!("https://n{i}.test"),
});
}
link(&mut store, "c1", &linked, 100);
let hub_id = entity_id(hub.kind, &hub.value);
let opts = EntityNeighborsOptions {
limit: Some(2),
..Default::default()
};
let report = entity_neighbors(store.conn(), &hub_id, &opts).unwrap();
assert_eq!(report.total_neighbors, 4);
assert_eq!(report.neighbors.len(), 2);
}
#[test]
fn entity_neighbors_returns_empty_when_no_co_occurrences() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1"]);
let solo = Entity {
kind: EntityKind::Url,
value: "https://solo.test".into(),
};
link(&mut store, "c1", &[solo.clone()], 100);
let solo_id = entity_id(solo.kind, &solo.value);
let report =
entity_neighbors(store.conn(), &solo_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.total_neighbors, 0);
assert!(report.neighbors.is_empty());
}
#[test]
fn entity_neighbors_with_chunks_includes_shared_chunk_refs() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2", "c3"]);
{
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"UPDATE chunks
SET text = CASE id
WHEN 'c1' THEN '#topic and @friend in first shared chunk'
WHEN 'c2' THEN '#topic and @friend in second shared chunk'
WHEN 'c3' THEN '@friend only unrelated chunk'
END
WHERE id IN ('c1', 'c2', 'c3')",
[],
)
.unwrap();
tx.commit().unwrap();
}
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
link(&mut store, "c3", &[friend], 300);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
with_chunks: Some(1),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
let refs = report.neighbors[0]
.chunks
.as_ref()
.expect("shared chunk refs should be included");
assert_eq!(refs.len(), 1);
assert_eq!(refs[0].chunk_id, "c1");
assert_eq!(refs[0].source_uri, "mem://list");
assert_eq!(refs[0].snippet, "#topic and @friend in first shared chunk");
}
#[test]
fn entity_neighbors_errors_on_unknown_id() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let err =
entity_neighbors(store.conn(), "nope", &EntityNeighborsOptions::default()).unwrap_err();
assert!(err.to_string().contains("no entity with id"));
}
#[test]
fn entity_neighbors_reports_neighbor_total_chunk_count() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2", "c3"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.edge_kind, EdgeKind::CoOccursWith);
assert_eq!(n.shared_chunks, 1);
assert_eq!(n.chunk_count, 3);
}
#[test]
fn entity_neighbors_session_count_is_zero_for_chunks_without_session_id() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 2);
assert_eq!(n.chunk_count, 2);
assert_eq!(n.session_count, 0);
}
#[test]
fn entity_neighbors_session_count_counts_distinct_sessions() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s1")),
("c2", Some("s2")),
("c3", Some("s2")),
("c4", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 1);
assert_eq!(n.chunk_count, 4);
assert_eq!(
n.session_count, 2,
"popular spans sessions s1 + s2; the NULL-session chunk must not count",
);
}
#[test]
fn entity_neighbors_session_count_is_one_under_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s1")),
("c2", Some("s1")),
("c3", Some("s2")),
("c4", Some("s2")),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[topic.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let global =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(global.neighbors.len(), 1);
assert_eq!(global.neighbors[0].session_count, 2);
let scoped = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
session_id: Some("s1".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(scoped.neighbors.len(), 1);
assert_eq!(scoped.neighbors[0].shared_chunks, 1);
assert_eq!(scoped.neighbors[0].chunk_count, 2);
assert_eq!(scoped.neighbors[0].session_count, 1);
}
#[test]
fn entity_neighbors_project_count_is_zero_for_chunks_without_project() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 2);
assert_eq!(n.chunk_count, 2);
assert_eq!(n.project_count, 0);
}
#[test]
fn entity_neighbors_project_count_counts_distinct_projects() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(
&mut store,
&[
("c1", Some("alpha")),
("c2", Some("beta")),
("c3", Some("beta")),
("c4", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 1);
assert_eq!(n.chunk_count, 4);
assert_eq!(
n.project_count, 2,
"popular spans projects alpha + beta; the NULL-project chunk must not count",
);
}
#[test]
fn entity_neighbors_project_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", "s1", "alpha"),
("c2", "s1", "alpha"),
("c3", "s1", "beta"),
("c4", "s2", "gamma"),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let global =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(global.neighbors.len(), 1);
assert_eq!(global.neighbors[0].project_count, 3);
assert_eq!(global.neighbors[0].session_count, 2);
let scoped = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
session_id: Some("s1".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(scoped.neighbors.len(), 1);
assert_eq!(scoped.neighbors[0].session_count, 1);
assert_eq!(
scoped.neighbors[0].project_count, 2,
"project grouping must not collapse under a session_id filter",
);
assert_eq!(scoped.neighbors[0].chunk_count, 3);
}
#[test]
fn entity_neighbors_user_count_is_zero_for_chunks_without_user() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks(&mut store, &["c1", "c2"]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 2);
assert_eq!(n.chunk_count, 2);
assert_eq!(n.user_count, 0);
}
#[test]
fn entity_neighbors_user_count_counts_distinct_users() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(
&mut store,
&[
("c1", Some("alice")),
("c2", Some("bob")),
("c3", Some("bob")),
("c4", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[popular.clone()], 200);
link(&mut store, "c3", &[popular.clone()], 300);
link(&mut store, "c4", &[popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_chunks, 1);
assert_eq!(n.chunk_count, 4);
assert_eq!(
n.user_count, 2,
"popular spans users alice + bob; the NULL-user chunk must not count",
);
}
#[test]
fn entity_neighbors_user_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", "s1", "alice"),
("c2", "s1", "alice"),
("c3", "s1", "bob"),
("c4", "s2", "carol"),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let global =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(global.neighbors.len(), 1);
assert_eq!(global.neighbors[0].user_count, 3);
assert_eq!(global.neighbors[0].session_count, 2);
let scoped = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
session_id: Some("s1".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(scoped.neighbors.len(), 1);
assert_eq!(scoped.neighbors[0].session_count, 1);
assert_eq!(
scoped.neighbors[0].user_count, 2,
"user grouping must not collapse under a session_id filter",
);
assert_eq!(scoped.neighbors[0].chunk_count, 3);
}
#[test]
fn entity_neighbors_session_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[("c1", Some("s1")), ("c2", Some("s2")), ("c3", Some("s3"))],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
let only_s2 = Entity {
kind: EntityKind::Mention,
value: "@only-s2".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(
&mut store,
"c2",
&[topic.clone(), shared.clone(), only_s2.clone()],
200,
);
link(&mut store, "c3", &[shared.clone()], 300);
let topic_id = entity_id(topic.kind, &topic.value);
let default_report =
entity_neighbors(store.conn(), &topic_id, &EntityNeighborsOptions::default()).unwrap();
assert_eq!(default_report.neighbors.len(), 2);
assert!(
default_report
.neighbors
.iter()
.all(|n| n.shared_session_ids.is_none())
);
let zeroed = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
with_sessions: Some(0),
..Default::default()
},
)
.unwrap();
assert!(
zeroed
.neighbors
.iter()
.all(|n| n.shared_session_ids.is_none())
);
}
#[test]
fn entity_neighbors_session_evidence_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s3")),
("c2", Some("s1")),
("c3", Some("s2")),
("c4", Some("s4")),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[topic.clone(), shared.clone()], 300);
link(&mut store, "c4", &[shared.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
with_sessions: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_session_ids.as_ref(),
Some(&vec!["s1".to_string(), "s2".to_string()]),
);
}
#[test]
fn entity_neighbors_session_evidence_collapses_under_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[("c1", Some("s1")), ("c2", Some("s2")), ("c3", Some("s2"))],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
let only_s2 = Entity {
kind: EntityKind::Mention,
value: "@only-s2".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[shared.clone(), only_s2.clone()], 300);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
session_id: Some("s2".into()),
with_sessions: Some(3),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s2"));
assert_eq!(report.neighbors.len(), 1);
assert_eq!(report.neighbors[0].value, "@shared");
assert_eq!(
report.neighbors[0].shared_session_ids.as_ref(),
Some(&vec!["s2".to_string()]),
);
}
#[test]
fn entity_neighbors_project_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(&mut store, &[("c1", Some("alpha")), ("c2", Some("beta"))]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic, shared], 200);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let default_report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
..Default::default()
},
)
.unwrap();
assert_eq!(default_report.neighbors.len(), 1);
assert!(default_report.neighbors[0].shared_projects.is_none());
let zeroed = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
with_projects: Some(0),
..Default::default()
},
)
.unwrap();
assert!(zeroed.neighbors[0].shared_projects.is_none());
}
#[test]
fn entity_neighbors_project_evidence_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_projects(
&mut store,
&[
("c1", Some("gamma")),
("c2", Some("alpha")),
("c3", Some("beta")),
("c4", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[topic.clone(), shared.clone()], 300);
link(&mut store, "c4", &[shared], 400);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
with_projects: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_projects.as_ref(),
Some(&vec!["alpha".to_string(), "beta".to_string()]),
);
}
#[test]
fn entity_neighbors_project_evidence_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", Some("s1"), Some("alpha")),
("c2", Some("s1"), Some("beta")),
("c3", Some("s2"), Some("gamma")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[topic, shared], 300);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
session_id: Some("s1".into()),
with_projects: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_projects.as_ref(),
Some(&vec!["alpha".to_string(), "beta".to_string()]),
);
}
#[test]
fn entity_neighbors_user_evidence_is_opt_in_and_zero_keeps_default_shape() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(&mut store, &[("c1", Some("alice")), ("c2", Some("bob"))]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic, shared], 200);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let default_report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
..Default::default()
},
)
.unwrap();
assert_eq!(default_report.neighbors.len(), 1);
assert!(default_report.neighbors[0].shared_users.is_none());
let zeroed = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
with_users: Some(0),
..Default::default()
},
)
.unwrap();
assert!(zeroed.neighbors[0].shared_users.is_none());
}
#[test]
fn entity_neighbors_user_evidence_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_users(
&mut store,
&[
("c1", Some("carol")),
("c2", Some("alice")),
("c3", Some("bob")),
("c4", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[topic.clone(), shared.clone()], 300);
link(&mut store, "c4", &[shared], 400);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
with_users: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_users.as_ref(),
Some(&vec!["alice".to_string(), "bob".to_string()]),
);
}
#[test]
fn entity_neighbors_user_evidence_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", Some("s1"), Some("alice")),
("c2", Some("s1"), Some("bob")),
("c3", Some("s2"), Some("carol")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone(), shared.clone()], 100);
link(&mut store, "c2", &[topic.clone(), shared.clone()], 200);
link(&mut store, "c3", &[topic, shared], 300);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_neighbors(
store.conn(),
&topic_id,
&EntityNeighborsOptions {
kind: Some(EntityKind::Mention),
session_id: Some("s1".into()),
with_users: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_users.as_ref(),
Some(&vec!["alice".to_string(), "bob".to_string()]),
);
}
#[test]
fn edge_kind_serializes_to_co_occurs_with() {
assert_eq!(EdgeKind::CoOccursWith.as_str(), "co_occurs_with");
let json = serde_json::to_string(&EdgeKind::CoOccursWith).unwrap();
assert_eq!(json, "\"co_occurs_with\"");
}
#[test]
fn edge_kind_serializes_to_same_session_as() {
assert_eq!(EdgeKind::SameSessionAs.as_str(), "same_session_as");
let json = serde_json::to_string(&EdgeKind::SameSessionAs).unwrap();
assert_eq!(json, "\"same_session_as\"");
}
#[test]
fn entity_session_neighbors_project_evidence_is_omitted_by_default_and_zero() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_projects_default_src', 'mem://session-projects-default', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", Some("s1"), Some("alpha")),
("c2", Some("s1"), Some("beta")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'session_projects_default_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c2", &[shared.clone()], 200);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let default_report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
..Default::default()
},
)
.unwrap();
assert_eq!(default_report.neighbors.len(), 1);
assert!(default_report.neighbors[0].shared_projects.is_none());
let zeroed = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
with_projects: Some(0),
..Default::default()
},
)
.unwrap();
assert!(zeroed.neighbors[0].shared_projects.is_none());
}
#[test]
fn entity_session_neighbors_project_evidence_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_projects_src', 'mem://session-projects', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", Some("s2"), Some("gamma")),
("c2", Some("s1"), Some("alpha")),
("c3", Some("s1"), Some("beta")),
("c4", Some("s3"), Some("delta")),
("c5", Some("s2"), None),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'session_projects_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c2", &[topic.clone()], 200);
link(&mut store, "c3", &[shared.clone()], 300);
link(&mut store, "c4", &[shared.clone()], 400);
link(&mut store, "c5", &[shared], 500);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
with_projects: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_projects.as_ref(),
Some(&vec!["alpha".to_string(), "beta".to_string()]),
);
}
#[test]
fn entity_session_neighbors_project_evidence_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_scope_src', 'mem://session-scope', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", Some("s1"), Some("alpha")),
("c2", Some("s1"), Some("beta")),
("c3", Some("s2"), Some("gamma")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'session_scope_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c2", &[shared.clone()], 200);
link(&mut store, "c3", &[topic, shared], 300);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
session_id: Some("s1".into()),
with_projects: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_projects.as_ref(),
Some(&vec!["alpha".to_string(), "beta".to_string()]),
);
}
#[test]
fn entity_session_neighbors_user_evidence_is_omitted_by_default_and_zero() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_users_default_src', 'mem://session-users-default', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", Some("s1"), Some("alice")),
("c2", Some("s1"), Some("bob")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'session_users_default_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", std::slice::from_ref(&topic), 100);
link(&mut store, "c2", std::slice::from_ref(&shared), 200);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let omitted = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
..Default::default()
},
)
.unwrap();
assert_eq!(omitted.neighbors.len(), 1);
assert!(omitted.neighbors[0].shared_users.is_none());
let zeroed = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
with_users: Some(0),
..Default::default()
},
)
.unwrap();
assert!(zeroed.neighbors[0].shared_users.is_none());
}
#[test]
fn entity_session_neighbors_user_evidence_orders_and_truncates_deterministically() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_users_order_src', 'mem://session-users-order', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", Some("s1"), Some("carol")),
("c2", Some("s1"), Some("alice")),
("c3", Some("s2"), Some("bob")),
("c4", Some("s2"), None),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'session_users_order_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", std::slice::from_ref(&topic), 100);
link(&mut store, "c2", std::slice::from_ref(&shared), 200);
link(&mut store, "c3", std::slice::from_ref(&topic), 300);
link(&mut store, "c4", std::slice::from_ref(&shared), 400);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
with_users: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_users.as_ref(),
Some(&vec!["alice".to_string(), "bob".to_string()]),
);
}
#[test]
fn entity_session_neighbors_user_evidence_honors_session_scope_without_collapsing() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('session_users_scope_src', 'mem://session-users-scope', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", Some("s1"), Some("alice")),
("c2", Some("s1"), Some("bob")),
("c3", Some("s2"), Some("carol")),
("c4", Some("s2"), Some("dave")),
]
.into_iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'session_users_scope_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let shared = Entity {
kind: EntityKind::Mention,
value: "@shared".into(),
};
link(&mut store, "c1", std::slice::from_ref(&topic), 100);
link(&mut store, "c2", std::slice::from_ref(&shared), 200);
link(&mut store, "c3", std::slice::from_ref(&topic), 300);
link(&mut store, "c4", std::slice::from_ref(&shared), 400);
let topic_id = entity_id(EntityKind::Hashtag, "#topic");
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
kind: Some(EntityKind::Mention),
session_id: Some("s1".into()),
with_users: Some(5),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s1"));
assert_eq!(report.neighbors.len(), 1);
assert_eq!(
report.neighbors[0].shared_users.as_ref(),
Some(&vec!["alice".to_string(), "bob".to_string()]),
);
}
#[test]
fn entity_session_neighbors_ranks_by_shared_session_count() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s1")),
("c2", Some("s1")),
("c3", Some("s2")),
("c4", Some("s2")),
("c5", Some("s3")),
("c6", None),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let frequent = Entity {
kind: EntityKind::Mention,
value: "@frequent".into(),
};
let sometimes = Entity {
kind: EntityKind::Mention,
value: "@sometimes".into(),
};
let unrelated = Entity {
kind: EntityKind::Url,
value: "https://unrelated.test".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c3", &[topic.clone()], 200);
link(&mut store, "c2", &[frequent.clone()], 150);
link(&mut store, "c4", &[frequent.clone()], 250);
link(&mut store, "c4", &[sometimes.clone()], 260);
link(&mut store, "c5", &[unrelated.clone()], 300);
link(&mut store, "c6", &[unrelated.clone()], 310);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.source_value, "#topic");
assert_eq!(report.source_session_count, 2);
assert_eq!(report.total_neighbors, 2);
assert_eq!(report.neighbors.len(), 2);
assert_eq!(report.neighbors[0].value, "@frequent");
assert_eq!(report.neighbors[0].edge_kind, EdgeKind::SameSessionAs);
assert_eq!(report.neighbors[0].shared_sessions, 2);
assert_eq!(report.neighbors[0].session_count, 2);
assert_eq!(report.neighbors[1].value, "@sometimes");
assert_eq!(report.neighbors[1].shared_sessions, 1);
assert_eq!(report.neighbors[1].session_count, 1);
assert!(report.neighbors.iter().all(|n| n.id != topic_id));
}
#[test]
fn entity_session_neighbors_honors_session_id_scope() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s1")),
("c2", Some("s1")),
("c3", Some("s2")),
("c4", Some("s2")),
],
);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let frequent = Entity {
kind: EntityKind::Mention,
value: "@frequent".into(),
};
let only_s2 = Entity {
kind: EntityKind::Mention,
value: "@only-s2".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c2", &[frequent.clone()], 110);
link(&mut store, "c3", &[topic.clone()], 200);
link(&mut store, "c4", &[frequent.clone(), only_s2.clone()], 210);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
session_id: Some("s2".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(report.session_id.as_deref(), Some("s2"));
assert_eq!(report.source_session_count, 1);
assert_eq!(report.total_neighbors, 2);
assert_eq!(report.neighbors.len(), 2);
assert!(report.neighbors.iter().all(|n| n.shared_sessions == 1));
assert!(report.neighbors.iter().all(|n| n.session_count == 1));
let values: Vec<&str> = report.neighbors.iter().map(|n| n.value.as_str()).collect();
assert_eq!(values, vec!["@frequent", "@only-s2"]);
let missing = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
session_id: Some("missing".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(missing.session_id.as_deref(), Some("missing"));
assert_eq!(missing.source_session_count, 0);
assert_eq!(missing.total_neighbors, 0);
assert!(missing.neighbors.is_empty());
}
#[test]
fn entity_session_neighbors_surfaces_session_overlap_without_chunk_overlap() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s1"))]);
let a = Entity {
kind: EntityKind::Hashtag,
value: "#a".into(),
};
let b = Entity {
kind: EntityKind::Mention,
value: "@b".into(),
};
link(&mut store, "c1", &[a.clone()], 100);
link(&mut store, "c2", &[b.clone()], 200);
let a_id = entity_id(a.kind, &a.value);
let chunk_report =
entity_neighbors(store.conn(), &a_id, &EntityNeighborsOptions::default()).unwrap();
assert!(chunk_report.neighbors.is_empty(), "no chunk co-occurrence");
let session_report = entity_session_neighbors(
store.conn(),
&a_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(session_report.neighbors.len(), 1);
assert_eq!(session_report.neighbors[0].value, "@b");
assert_eq!(session_report.neighbors[0].shared_sessions, 1);
assert_eq!(
session_report.neighbors[0].edge_kind,
EdgeKind::SameSessionAs
);
}
#[test]
fn entity_session_neighbors_excludes_null_session_chunks() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", None), ("c2", None)]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let other = Entity {
kind: EntityKind::Mention,
value: "@other".into(),
};
link(&mut store, "c1", &[topic.clone(), other.clone()], 100);
link(&mut store, "c2", &[topic.clone(), other.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.source_session_count, 0);
assert_eq!(report.total_neighbors, 0);
assert!(report.neighbors.is_empty());
}
#[test]
fn entity_session_neighbors_filters_by_kind() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s1"))]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let mention = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
let url = Entity {
kind: EntityKind::Url,
value: "https://example.test/x".into(),
};
link(&mut store, "c1", &[topic.clone()], 100);
link(&mut store, "c2", &[mention.clone(), url.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let opts = EntitySessionNeighborsOptions {
kind: Some(EntityKind::Url),
..Default::default()
};
let report = entity_session_neighbors(store.conn(), &topic_id, &opts).unwrap();
assert_eq!(report.total_neighbors, 1);
assert_eq!(report.neighbors.len(), 1);
assert_eq!(report.neighbors[0].kind, EntityKind::Url);
assert_eq!(report.neighbors[0].value, "https://example.test/x");
assert_eq!(report.kind_filter, Some(EntityKind::Url));
}
#[test]
fn entity_session_neighbors_limit_truncates_but_total_is_full() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(
&mut store,
&[
("c1", Some("s1")),
("c2", Some("s1")),
("c3", Some("s1")),
("c4", Some("s1")),
("c5", Some("s1")),
],
);
let hub = Entity {
kind: EntityKind::Hashtag,
value: "#hub".into(),
};
link(&mut store, "c1", &[hub.clone()], 100);
for i in 0..4 {
let n = Entity {
kind: EntityKind::Url,
value: format!("https://n{i}.test"),
};
link(&mut store, &format!("c{}", i + 2), &[n], 200 + i as i64);
}
let hub_id = entity_id(hub.kind, &hub.value);
let opts = EntitySessionNeighborsOptions {
limit: Some(2),
..Default::default()
};
let report = entity_session_neighbors(store.conn(), &hub_id, &opts).unwrap();
assert_eq!(report.total_neighbors, 4);
assert_eq!(report.neighbors.len(), 2);
}
#[test]
fn entity_session_neighbors_project_count_is_zero_for_chunks_without_project() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s2"))]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_sessions, 2);
assert_eq!(n.session_count, 2);
assert_eq!(n.project_count, 0);
}
#[test]
fn entity_session_neighbors_project_count_counts_distinct_projects_with_mixed_nulls() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", "s1", Some("alpha")),
("c2", "s2", Some("beta")),
("c3", "s3", Some("beta")),
("c4", "s4", None),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_sessions, 4);
assert_eq!(n.session_count, 4);
assert_eq!(
n.project_count, 2,
"popular spans projects alpha + beta; the NULL-project chunk must not count",
);
}
#[test]
fn entity_session_neighbors_project_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, project)) in [
("c1", "s1", "alpha"),
("c2", "s1", "alpha"),
("c3", "s1", "beta"),
("c4", "s2", "gamma"),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, project, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, project],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let global = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(global.neighbors.len(), 1);
assert_eq!(global.neighbors[0].session_count, 2);
assert_eq!(global.neighbors[0].project_count, 3);
let scoped = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
session_id: Some("s1".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(scoped.neighbors.len(), 1);
assert_eq!(scoped.neighbors[0].session_count, 1);
assert_eq!(
scoped.neighbors[0].project_count, 2,
"project grouping must not collapse under a session_id filter",
);
}
#[test]
fn entity_session_neighbors_user_count_is_zero_for_chunks_without_user() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
seed_chunks_with_sessions(&mut store, &[("c1", Some("s1")), ("c2", Some("s2"))]);
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let friend = Entity {
kind: EntityKind::Mention,
value: "@friend".into(),
};
link(&mut store, "c1", &[topic.clone(), friend.clone()], 100);
link(&mut store, "c2", &[topic.clone(), friend.clone()], 200);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_sessions, 2);
assert_eq!(n.session_count, 2);
assert_eq!(n.user_count, 0);
}
#[test]
fn entity_session_neighbors_user_count_counts_distinct_users_with_mixed_nulls() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", "s1", Some("alice")),
("c2", "s2", Some("bob")),
("c3", "s3", Some("bob")),
("c4", "s4", None),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let report = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(report.neighbors.len(), 1);
let n = &report.neighbors[0];
assert_eq!(n.shared_sessions, 4);
assert_eq!(n.session_count, 4);
assert_eq!(
n.user_count, 2,
"popular spans users alice + bob; the NULL-user chunk must not count",
);
}
#[test]
fn entity_session_neighbors_user_count_is_independent_of_session_filter() {
let dir = tempdir().unwrap();
let mut store = Store::initialize(&dir.path().join("store")).unwrap();
let conn = store.conn_mut();
let tx = conn.transaction().unwrap();
tx.execute(
"INSERT INTO sources (id, uri, path, kind, bytes, content_sha256, mtime_unix, ingested_at)
VALUES ('list_src', 'mem://list', NULL, 'text/plain', 0, 'deadbeef', NULL, 0)",
[],
)
.unwrap();
for (ord, (id, session, user)) in [
("c1", "s1", "alice"),
("c2", "s1", "alice"),
("c3", "s1", "bob"),
("c4", "s2", "carol"),
]
.iter()
.enumerate()
{
tx.execute(
"INSERT INTO chunks (id, source_id, ordinal, byte_start, byte_end, char_count, text, sha256, session_id, user, created_at)
VALUES (?1, 'list_src', ?2, 0, 0, 0, '', '', ?3, ?4, 0)",
params![id, ord as i64, session, user],
)
.unwrap();
}
tx.commit().unwrap();
let topic = Entity {
kind: EntityKind::Hashtag,
value: "#topic".into(),
};
let popular = Entity {
kind: EntityKind::Mention,
value: "@popular".into(),
};
link(&mut store, "c1", &[topic.clone(), popular.clone()], 100);
link(&mut store, "c2", &[topic.clone(), popular.clone()], 200);
link(&mut store, "c3", &[topic.clone(), popular.clone()], 300);
link(&mut store, "c4", &[topic.clone(), popular.clone()], 400);
let topic_id = entity_id(topic.kind, &topic.value);
let global = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions::default(),
)
.unwrap();
assert_eq!(global.neighbors.len(), 1);
assert_eq!(global.neighbors[0].session_count, 2);
assert_eq!(global.neighbors[0].user_count, 3);
let scoped = entity_session_neighbors(
store.conn(),
&topic_id,
&EntitySessionNeighborsOptions {
session_id: Some("s1".into()),
..Default::default()
},
)
.unwrap();
assert_eq!(scoped.neighbors.len(), 1);
assert_eq!(scoped.neighbors[0].session_count, 1);
assert_eq!(
scoped.neighbors[0].user_count, 2,
"user grouping must not collapse under a session_id filter",
);
}
#[test]
fn entity_session_neighbors_errors_on_unknown_id() {
let dir = tempdir().unwrap();
let store = Store::initialize(&dir.path().join("store")).unwrap();
let err = entity_session_neighbors(
store.conn(),
"nope",
&EntitySessionNeighborsOptions::default(),
)
.unwrap_err();
assert!(err.to_string().contains("no entity with id"));
}
#[test]
fn entity_kind_from_str_round_trips_known_kinds() {
for k in [
EntityKind::Url,
EntityKind::Repo,
EntityKind::Domain,
EntityKind::Email,
EntityKind::FilePath,
EntityKind::Mention,
EntityKind::Hashtag,
] {
assert_eq!(entity_kind_from_str(k.as_str()).unwrap(), k);
}
assert_eq!(entity_kind_from_str("URL").unwrap(), EntityKind::Url);
assert_eq!(entity_kind_from_str("Domain").unwrap(), EntityKind::Domain);
assert!(entity_kind_from_str("nope").is_err());
}
}