use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use serde::Serialize;
use crate::inspect::ago;
#[derive(Debug, Clone, Default)]
pub struct SessionListOptions {
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct SessionEntry {
pub session_id: String,
pub chunk_count: i64,
pub source_count: i64,
pub first_timestamp_unix: Option<i64>,
pub last_timestamp_unix: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub project: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub topic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub thread: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct SessionListReport {
pub entries: Vec<SessionEntry>,
pub total_sessions: i64,
}
pub fn list_sessions(conn: &Connection, opts: &SessionListOptions) -> Result<SessionListReport> {
let total_sessions: i64 = conn
.query_row(
"SELECT COUNT(*) FROM (
SELECT session_id FROM chunks
WHERE session_id IS NOT NULL
GROUP BY session_id
)",
[],
|row| row.get(0),
)
.context("counting distinct sessions")?;
let limit = opts.limit.unwrap_or(usize::MAX).min(i64::MAX as usize) as i64;
let mut stmt = conn
.prepare(
"SELECT session_id,
COUNT(*) AS chunk_count,
COUNT(DISTINCT source_id) AS source_count,
MIN(timestamp_unix) AS first_ts,
MAX(timestamp_unix) AS last_ts,
-- Representative project: surface a non-null value only
-- when every chunk in the session agrees on the same
-- project. `COUNT(project)` counts non-null rows; if it
-- equals the chunk count and MIN == MAX, all chunks
-- share one non-null project. Anything else (mixed
-- values or any nulls) collapses to NULL.
CASE
WHEN COUNT(project) = COUNT(*)
AND MIN(project) = MAX(project)
THEN MIN(project)
ELSE NULL
END AS project,
-- Same all-or-nothing rule for `user`.
CASE
WHEN COUNT(user) = COUNT(*)
AND MIN(user) = MAX(user)
THEN MIN(user)
ELSE NULL
END AS user,
-- Same all-or-nothing rule for `topic`.
CASE
WHEN COUNT(topic) = COUNT(*)
AND MIN(topic) = MAX(topic)
THEN MIN(topic)
ELSE NULL
END AS topic,
-- Same all-or-nothing rule for `thread`.
CASE
WHEN COUNT(thread) = COUNT(*)
AND MIN(thread) = MAX(thread)
THEN MIN(thread)
ELSE NULL
END AS thread
FROM chunks
WHERE session_id IS NOT NULL
GROUP BY session_id
ORDER BY chunk_count DESC, session_id ASC
LIMIT ?1",
)
.context("preparing list_sessions query")?;
let rows = stmt
.query_map([limit], |row| {
Ok(SessionEntry {
session_id: row.get(0)?,
chunk_count: row.get(1)?,
source_count: row.get(2)?,
first_timestamp_unix: row.get(3)?,
last_timestamp_unix: row.get(4)?,
project: row.get(5)?,
user: row.get(6)?,
topic: row.get(7)?,
thread: row.get(8)?,
})
})
.context("running list_sessions query")?;
let entries = rows.collect::<Result<Vec<_>, _>>()?;
Ok(SessionListReport {
entries,
total_sessions,
})
}
pub fn print_text(report: &SessionListReport) {
if report.entries.is_empty() {
println!("no sessions found");
return;
}
println!(
"{} session{} ({} total)",
report.entries.len(),
if report.entries.len() == 1 { "" } else { "s" },
report.total_sessions,
);
let now = crate::inspect::now_unix();
for entry in &report.entries {
let span = match (entry.first_timestamp_unix, entry.last_timestamp_unix) {
(Some(first), Some(last)) if first == last => format!(" at={}", ago(now, first)),
(Some(first), Some(last)) => {
format!(" first={} last={}", ago(now, first), ago(now, last))
}
_ => String::new(),
};
let project = match &entry.project {
Some(p) => format!(" project={p}"),
None => String::new(),
};
let user = match &entry.user {
Some(u) => format!(" user={u}"),
None => String::new(),
};
let topic = match &entry.topic {
Some(t) => format!(" topic={t}"),
None => String::new(),
};
let thread = match &entry.thread {
Some(t) => format!(" thread={t}"),
None => String::new(),
};
println!(
" {session:<32} chunks={chunks:<4} sources={sources:<3}{span}{project}{user}{topic}{thread}",
session = entry.session_id,
chunks = entry.chunk_count,
sources = entry.source_count,
);
}
}
pub fn print_json(report: &SessionListReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct RelatedSessionsOptions {
pub limit: Option<usize>,
pub with_entities: Option<usize>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct RelatedSessionEntityEvidence {
pub kind: String,
pub value: String,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct RelatedSession {
pub session_id: String,
pub shared_entity_count: i64,
pub shared_chunk_count: i64,
pub chunk_count: i64,
pub first_timestamp_unix: Option<i64>,
pub last_timestamp_unix: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub project: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_entities: Option<Vec<RelatedSessionEntityEvidence>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RelatedSessionsReport {
pub source_session_id: String,
pub source_chunk_count: i64,
pub source_entity_count: i64,
pub sessions: Vec<RelatedSession>,
pub total_related: i64,
}
pub fn related_sessions(
conn: &Connection,
source_session_id: &str,
opts: &RelatedSessionsOptions,
) -> Result<RelatedSessionsReport> {
let source_chunk_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM chunks WHERE session_id = ?1",
params![source_session_id],
|row| row.get(0),
)
.context("counting chunks in source session")?;
if source_chunk_count == 0 {
anyhow::bail!("no session with id '{source_session_id}'");
}
let source_entity_count: i64 = conn
.query_row(
"SELECT COUNT(DISTINCT ce.entity_id)
FROM chunk_entities ce
JOIN chunks c ON c.id = ce.chunk_id
WHERE c.session_id = ?1",
params![source_session_id],
|row| row.get(0),
)
.context("counting distinct entities in source session")?;
let total_related: i64 = conn
.query_row(
"SELECT COUNT(*) FROM (
SELECT c.session_id
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (
SELECT DISTINCT ce2.entity_id
FROM chunk_entities ce2
JOIN chunks src ON src.id = ce2.chunk_id
WHERE src.session_id = ?1
)
AND c.session_id IS NOT NULL
AND c.session_id != ?1
GROUP BY c.session_id
)",
params![source_session_id],
|row| row.get(0),
)
.context("counting related sessions")?;
let limit = opts.limit.unwrap_or(usize::MAX).min(i64::MAX as usize) as i64;
let mut stmt = conn
.prepare(
"SELECT c.session_id AS session_id,
COUNT(DISTINCT ce.entity_id) AS shared_entity_count,
COUNT(DISTINCT c.id) AS shared_chunk_count,
(SELECT COUNT(*)
FROM chunks c2
WHERE c2.session_id = c.session_id) AS chunk_count,
(SELECT MIN(timestamp_unix)
FROM chunks c2
WHERE c2.session_id = c.session_id) AS first_ts,
(SELECT MAX(timestamp_unix)
FROM chunks c2
WHERE c2.session_id = c.session_id) AS last_ts,
-- Representative project for the related session: same
-- all-or-nothing rule as `list_sessions` — surface only
-- when every chunk in the session agrees on one non-null
-- project. Scoped to the full session, not just the
-- shared-entity-touching chunks, so the field matches
-- what `lantern sessions` would report.
(SELECT CASE
WHEN COUNT(project) = COUNT(*)
AND MIN(project) = MAX(project)
THEN MIN(project)
ELSE NULL
END
FROM chunks c2
WHERE c2.session_id = c.session_id) AS project
FROM chunks c
JOIN chunk_entities ce ON ce.chunk_id = c.id
WHERE ce.entity_id IN (
SELECT DISTINCT ce2.entity_id
FROM chunk_entities ce2
JOIN chunks src ON src.id = ce2.chunk_id
WHERE src.session_id = ?1
)
AND c.session_id IS NOT NULL
AND c.session_id != ?1
GROUP BY c.session_id
ORDER BY shared_entity_count DESC, c.session_id ASC
LIMIT ?2",
)
.context("preparing related_sessions query")?;
let rows = stmt
.query_map(params![source_session_id, limit], |row| {
Ok(RelatedSession {
session_id: row.get(0)?,
shared_entity_count: row.get(1)?,
shared_chunk_count: row.get(2)?,
chunk_count: row.get(3)?,
first_timestamp_unix: row.get(4)?,
last_timestamp_unix: row.get(5)?,
project: row.get(6)?,
shared_entities: None,
})
})
.context("running related_sessions query")?;
let mut sessions = rows.collect::<Result<Vec<_>, _>>()?;
if let Some(entity_limit) = opts.with_entities.filter(|n| *n > 0) {
for session in &mut sessions {
session.shared_entities = Some(load_related_session_entities(
conn,
source_session_id,
&session.session_id,
entity_limit,
)?);
}
}
Ok(RelatedSessionsReport {
source_session_id: source_session_id.to_string(),
source_chunk_count,
source_entity_count,
sessions,
total_related,
})
}
fn load_related_session_entities(
conn: &Connection,
source_session_id: &str,
related_session_id: &str,
limit: usize,
) -> Result<Vec<RelatedSessionEntityEvidence>> {
let limit = limit.min(i64::MAX as usize) as i64;
let mut stmt = conn
.prepare(
"SELECT e.kind,
e.value
FROM chunk_entities ce
JOIN entities e ON e.id = ce.entity_id
JOIN chunks c ON c.id = ce.chunk_id
WHERE c.session_id = ?2
AND ce.entity_id IN (
SELECT DISTINCT ce2.entity_id
FROM chunk_entities ce2
JOIN chunks src ON src.id = ce2.chunk_id
WHERE src.session_id = ?1
)
GROUP BY e.id, e.kind, e.value
ORDER BY COUNT(DISTINCT c.id) DESC, e.kind ASC, e.value ASC
LIMIT ?3",
)
.context("preparing related-session shared-entity query")?;
let rows = stmt
.query_map(
params![source_session_id, related_session_id, limit],
|row| {
Ok(RelatedSessionEntityEvidence {
kind: row.get(0)?,
value: row.get(1)?,
})
},
)
.context("running related-session shared-entity query")?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn print_related_text(report: &RelatedSessionsReport) {
println!(
"{} (chunks={} entities={}) — {} related session{}",
report.source_session_id,
report.source_chunk_count,
report.source_entity_count,
report.total_related,
if report.total_related == 1 { "" } else { "s" },
);
if report.sessions.is_empty() {
println!(" (no sessions share entities with this one)");
return;
}
let now = crate::inspect::now_unix();
for s in &report.sessions {
let span = match (s.first_timestamp_unix, s.last_timestamp_unix) {
(Some(first), Some(last)) if first == last => format!(" at={}", ago(now, first)),
(Some(first), Some(last)) => {
format!(" first={} last={}", ago(now, first), ago(now, last))
}
_ => String::new(),
};
let project = match &s.project {
Some(p) => format!(" project={p}"),
None => String::new(),
};
println!(
" {session:<32} shared_entities={se:<3} shared_chunks={sc:<3} chunks={total:<4}{span}{project}",
session = s.session_id,
se = s.shared_entity_count,
sc = s.shared_chunk_count,
total = s.chunk_count,
);
if let Some(shared_entities) = &s.shared_entities {
for entity in shared_entities {
println!(
" - [{kind}] {value}",
kind = entity.kind,
value = entity.value
);
}
}
}
}
pub fn print_related_json(report: &RelatedSessionsReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[derive(Debug, Clone, Default)]
pub struct TemporallyRelatedSessionsOptions {
pub limit: Option<usize>,
pub window_secs: Option<i64>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct TemporallyRelatedSession {
pub session_id: String,
pub gap_secs: i64,
pub overlap_secs: i64,
pub chunk_count: i64,
pub first_timestamp_unix: i64,
pub last_timestamp_unix: i64,
}
#[derive(Debug, Clone, Serialize)]
pub struct TemporallyRelatedSessionsReport {
pub source_session_id: String,
pub source_first_timestamp_unix: i64,
pub source_last_timestamp_unix: i64,
pub window_secs: Option<i64>,
pub sessions: Vec<TemporallyRelatedSession>,
pub total_related: i64,
}
pub fn temporally_related_sessions(
conn: &Connection,
source_session_id: &str,
opts: &TemporallyRelatedSessionsOptions,
) -> Result<TemporallyRelatedSessionsReport> {
let source_chunk_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM chunks WHERE session_id = ?1",
params![source_session_id],
|row| row.get(0),
)
.context("counting chunks in source session")?;
if source_chunk_count == 0 {
anyhow::bail!("no session with id '{source_session_id}'");
}
let (s_first, s_last): (Option<i64>, Option<i64>) = conn
.query_row(
"SELECT MIN(timestamp_unix), MAX(timestamp_unix)
FROM chunks
WHERE session_id = ?1 AND timestamp_unix IS NOT NULL",
params![source_session_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.context("aggregating source session timestamps")?;
let (source_first, source_last) = match (s_first, s_last) {
(Some(first), Some(last)) => (first, last),
_ => anyhow::bail!("session '{source_session_id}' has no chunks with a timestamp"),
};
let mut stmt = conn
.prepare(
"SELECT c.session_id,
(SELECT COUNT(*) FROM chunks c2 WHERE c2.session_id = c.session_id) AS chunk_count,
MIN(c.timestamp_unix) AS o_first,
MAX(c.timestamp_unix) AS o_last
FROM chunks c
WHERE c.session_id IS NOT NULL
AND c.session_id != ?1
AND c.timestamp_unix IS NOT NULL
GROUP BY c.session_id",
)
.context("preparing temporally_related_sessions query")?;
let rows = stmt
.query_map(params![source_session_id], |row| {
let session_id: String = row.get(0)?;
let chunk_count: i64 = row.get(1)?;
let o_first: i64 = row.get(2)?;
let o_last: i64 = row.get(3)?;
Ok((session_id, chunk_count, o_first, o_last))
})
.context("running temporally_related_sessions query")?;
let mut scored: Vec<TemporallyRelatedSession> = Vec::new();
for row in rows {
let (session_id, chunk_count, o_first, o_last) = row?;
let (gap_secs, overlap_secs) =
score_temporal_proximity((source_first, source_last), (o_first, o_last));
if let Some(window) = opts.window_secs
&& gap_secs > window
{
continue;
}
scored.push(TemporallyRelatedSession {
session_id,
gap_secs,
overlap_secs,
chunk_count,
first_timestamp_unix: o_first,
last_timestamp_unix: o_last,
});
}
scored.sort_by(|a, b| {
a.gap_secs
.cmp(&b.gap_secs)
.then_with(|| a.session_id.cmp(&b.session_id))
});
let total_related = scored.len() as i64;
if let Some(limit) = opts.limit {
scored.truncate(limit);
}
Ok(TemporallyRelatedSessionsReport {
source_session_id: source_session_id.to_string(),
source_first_timestamp_unix: source_first,
source_last_timestamp_unix: source_last,
window_secs: opts.window_secs,
sessions: scored,
total_related,
})
}
fn score_temporal_proximity(src: (i64, i64), other: (i64, i64)) -> (i64, i64) {
let (s_first, s_last) = src;
let (o_first, o_last) = other;
if o_last < s_first {
(s_first - o_last, 0)
} else if o_first > s_last {
(o_first - s_last, 0)
} else {
let overlap = s_last.min(o_last) - s_first.max(o_first);
(0, overlap.max(0))
}
}
pub fn print_temporally_related_text(report: &TemporallyRelatedSessionsReport) {
let now = crate::inspect::now_unix();
let window = match report.window_secs {
Some(w) => format!(" window={w}s"),
None => String::new(),
};
println!(
"{src} (first={first} last={last}){window} — {total} related session{plural}",
src = report.source_session_id,
first = ago(now, report.source_first_timestamp_unix),
last = ago(now, report.source_last_timestamp_unix),
total = report.total_related,
plural = if report.total_related == 1 { "" } else { "s" },
);
if report.sessions.is_empty() {
println!(" (no other sessions in range)");
return;
}
for s in &report.sessions {
println!(
" {session:<32} gap={gap:<7} overlap={overlap:<7} chunks={chunks:<4} first={first} last={last}",
session = s.session_id,
gap = format!("{}s", s.gap_secs),
overlap = format!("{}s", s.overlap_secs),
chunks = s.chunk_count,
first = ago(now, s.first_timestamp_unix),
last = ago(now, s.last_timestamp_unix),
);
}
}
pub fn print_temporally_related_json(report: &TemporallyRelatedSessionsReport) -> Result<()> {
println!("{}", serde_json::to_string_pretty(report)?);
Ok(())
}
#[cfg(test)]
mod tests {
use super::score_temporal_proximity;
#[test]
fn scoring_overlap_returns_zero_gap_and_intersection_length() {
assert_eq!(score_temporal_proximity((10, 20), (15, 25)), (0, 5));
assert_eq!(score_temporal_proximity((10, 20), (12, 18)), (0, 6));
assert_eq!(score_temporal_proximity((10, 20), (10, 20)), (0, 10));
assert_eq!(score_temporal_proximity((10, 20), (20, 30)), (0, 0));
}
#[test]
fn scoring_disjoint_returns_gap_and_zero_overlap() {
assert_eq!(score_temporal_proximity((100, 200), (10, 50)), (50, 0));
assert_eq!(score_temporal_proximity((100, 200), (300, 400)), (100, 0));
}
}