use crate::provider::{ChatMessage, ContentPart, Role, UserInputKind};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use chrono::{DateTime, Duration, Local, NaiveDate, TimeZone, Timelike, Utc};
use sapphire_workspace::WorkspaceState;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tracing::warn;
use uuid::Uuid;
pub type ConversationKey = (String, Option<String>);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMeta {
pub session_id: String,
pub room_id: String,
pub thread_id: Option<String>,
pub channel: String,
pub created_at: DateTime<Utc>,
#[serde(skip_serializing_if = "Option::is_none")]
pub public_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub project: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub device_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub room_profile: Option<String>,
#[serde(skip)]
pub title: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredMessage {
pub timestamp: DateTime<Utc>,
pub role: Role,
pub parts: Vec<ContentPart>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub input_kind: Option<UserInputKind>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub user_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub report_meta: Option<ReportMeta>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReportMeta {
pub source: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub hostname: Option<String>,
pub summary: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub body: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub files: Option<Vec<String>>,
}
impl StoredMessage {
pub fn from_chat(msg: &ChatMessage) -> Self {
Self {
timestamp: Utc::now(),
role: msg.role.clone(),
parts: msg.parts.clone(),
input_kind: msg.input_kind.clone(),
user_id: msg.user_id.clone(),
report_meta: None,
}
}
pub fn into_chat_message(self) -> ChatMessage {
ChatMessage {
role: self.role,
parts: self.parts,
input_kind: self.input_kind,
user_id: self.user_id,
}
}
}
#[derive(Serialize, Deserialize)]
struct MetaLine {
meta: SessionMeta,
}
#[derive(Serialize, Deserialize)]
struct ClosedLine {
closed_at: DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
struct TitleLine {
session_title: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SummaryLine {
pub summary_at: DateTime<Utc>,
pub summary: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub up_to_timestamp: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntradayDigestLine {
pub digest_at: DateTime<Utc>,
pub digest: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub since: Option<DateTime<Utc>>,
}
pub struct SessionStore {
pub base_dir: PathBuf,
pub kind: &'static str,
ws_state: Option<Arc<Mutex<WorkspaceState>>>,
path_cache: Mutex<HashMap<String, PathBuf>>,
}
impl SessionStore {
#[allow(dead_code)]
pub fn new(base_dir: PathBuf, kind: &'static str) -> Self {
Self {
base_dir,
kind,
ws_state: None,
path_cache: Mutex::new(HashMap::new()),
}
}
pub fn with_workspace(
base_dir: PathBuf,
kind: &'static str,
ws_state: Arc<Mutex<WorkspaceState>>,
) -> Self {
Self {
base_dir,
kind,
ws_state: Some(ws_state),
path_cache: Mutex::new(HashMap::new()),
}
}
fn path_for_new(&self, session_id: &str, namespace: &str) -> PathBuf {
let p = self
.base_dir
.join(namespace)
.join(self.kind)
.join(format!("{session_id}.jsonl"));
if let Ok(mut cache) = self.path_cache.lock() {
cache.insert(session_id.to_string(), p.clone());
}
p
}
pub fn absolute_path_for(&self, session_id: &str) -> Option<PathBuf> {
self.resolve_path(session_id)
}
fn resolve_path(&self, session_id: &str) -> Option<PathBuf> {
if let Ok(cache) = self.path_cache.lock()
&& let Some(p) = cache.get(session_id)
{
return Some(p.clone());
}
let target = format!("{session_id}.jsonl");
for path in collect_session_files(&self.base_dir, self.kind) {
if path.file_name().and_then(|s| s.to_str()) == Some(target.as_str()) {
if let Ok(mut cache) = self.path_cache.lock() {
cache.insert(session_id.to_string(), path.clone());
}
return Some(path);
}
}
None
}
fn notify_updated(&self, abs_path: &Path) {
let Some(state) = &self.ws_state else { return };
let guard = match state.lock() {
Ok(g) => g,
Err(e) => {
warn!("WorkspaceState mutex poisoned: {e}");
return;
}
};
if !abs_path.starts_with(&guard.workspace.root) {
return;
}
if let Err(e) = guard.on_file_updated(abs_path) {
warn!(
"Failed to notify workspace of update {}: {e}",
abs_path.display()
);
}
}
#[allow(dead_code)]
fn notify_deleted(&self, abs_path: &Path) {
let Some(state) = &self.ws_state else { return };
let guard = match state.lock() {
Ok(g) => g,
Err(e) => {
warn!("WorkspaceState mutex poisoned: {e}");
return;
}
};
if !abs_path.starts_with(&guard.workspace.root) {
return;
}
if let Err(e) = guard.on_file_deleted(abs_path) {
warn!(
"Failed to notify workspace of delete {}: {e}",
abs_path.display()
);
}
}
#[allow(dead_code)]
pub fn delete_session(&self, session_id: &str) -> anyhow::Result<()> {
if let Some(path) = self.resolve_path(session_id) {
if path.exists() {
fs::remove_file(&path)?;
self.notify_deleted(&path);
}
if let Ok(mut cache) = self.path_cache.lock() {
cache.remove(session_id);
}
}
Ok(())
}
pub fn create_session(
&self,
key: &ConversationKey,
channel: &str,
namespace: &str,
) -> anyhow::Result<String> {
let session_id = Uuid::now_v7().to_string();
let path = self.path_for_new(&session_id, namespace);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let meta = SessionMeta {
session_id: session_id.clone(),
room_id: key.0.clone(),
thread_id: key.1.clone(),
channel: channel.to_string(),
created_at: Utc::now(),
public_id: None,
namespace: Some(namespace.to_string()),
project: None,
device_id: None,
room_profile: None,
title: None,
};
let line = serde_json::to_string(&MetaLine { meta })?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(session_id)
}
pub fn append(&self, session_id: &str, msg: &ChatMessage) -> anyhow::Result<()> {
let scrubbed = scrub_images_for_storage(msg);
let to_store = scrubbed.as_ref().unwrap_or(msg);
let stored = StoredMessage::from_chat(to_store);
let line = serde_json::to_string(&stored)?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
pub fn append_summary(&self, session_id: &str, summary: &str) -> anyhow::Result<()> {
let line = serde_json::to_string(&SummaryLine {
summary_at: Utc::now(),
summary: summary.to_string(),
up_to_timestamp: None,
})?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
pub fn append_intraday_digest(
&self,
session_id: &str,
digest: &str,
since: Option<DateTime<Utc>>,
) -> anyhow::Result<()> {
let line = serde_json::to_string(&IntradayDigestLine {
digest_at: Utc::now(),
digest: digest.to_string(),
since,
})?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
pub fn intraday_digests_for_day(
&self,
date: NaiveDate,
boundary_hour: u8,
) -> Vec<(SessionMeta, IntradayDigestLine)> {
let (day_start, day_end) = day_window(date, boundary_hour);
let mut out = Vec::new();
for path in collect_session_files(&self.base_dir, self.kind) {
if let Ok(meta_fs) = path.metadata()
&& let Ok(mtime) = meta_fs.modified()
{
let mtime_utc: DateTime<Utc> = mtime.into();
if mtime_utc < day_start {
continue;
}
}
let Some((meta, digest)) = load_meta_and_latest_intraday_digest(&path) else {
continue;
};
let Some(d) = digest else { continue };
if d.digest_at >= day_start && d.digest_at < day_end {
out.push((meta, d));
}
}
out.sort_by_key(|(meta, _)| meta.created_at);
out
}
pub fn close_session(&self, session_id: &str) -> anyhow::Result<()> {
let line = serde_json::to_string(&ClosedLine {
closed_at: Utc::now(),
})?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
#[allow(clippy::type_complexity)]
pub fn load_all(
&self,
) -> (
HashMap<ConversationKey, String>,
HashMap<ConversationKey, String>,
HashMap<ConversationKey, Vec<ChatMessage>>,
) {
type SessionEntry = (
String,
ConversationKey,
Vec<StoredMessage>,
bool,
Option<String>,
);
let mut entries: Vec<SessionEntry> = Vec::new();
for path in collect_session_files(&self.base_dir, self.kind) {
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if let Some((meta, messages, is_closed, summary)) = load_session_file(&path) {
let key: ConversationKey = (meta.room_id.clone(), meta.thread_id.clone());
if !is_closed {
if let Ok(mut cache) = self.path_cache.lock() {
cache.insert(stem.clone(), path.clone());
}
}
entries.push((stem, key, messages, is_closed, summary.map(|s| s.summary)));
}
}
entries.sort_by(|a, b| a.0.cmp(&b.0));
let mut active: HashMap<ConversationKey, String> = HashMap::new();
let mut summaries: HashMap<ConversationKey, String> = HashMap::new();
let mut fallback: HashMap<ConversationKey, Vec<ChatMessage>> = HashMap::new();
for (session_id, key, messages, is_closed, summary) in entries {
if !is_closed {
active.insert(key.clone(), session_id);
match summary {
Some(s) => {
summaries.insert(key.clone(), s);
fallback.remove(&key);
}
None => {
summaries.remove(&key);
if !messages.is_empty() {
let chat_messages: Vec<ChatMessage> = messages
.into_iter()
.map(|m| m.into_chat_message())
.collect();
fallback.insert(key, chat_messages);
} else {
fallback.remove(&key);
}
}
}
}
}
(active, summaries, fallback)
}
pub fn list_sessions(&self) -> Vec<SessionMeta> {
let mut metas: Vec<SessionMeta> = collect_session_files(&self.base_dir, self.kind)
.into_iter()
.filter_map(|p| load_session_file(&p).map(|(meta, _, _, _)| meta))
.collect();
metas.sort_by_key(|m| m.created_at);
metas
}
pub fn load_session(&self, session_id: &str) -> Option<Vec<ChatMessage>> {
let path = self.resolve_path(session_id)?;
let (_, messages, _, _) = load_session_file(&path)?;
Some(
messages
.into_iter()
.map(|m| m.into_chat_message())
.collect(),
)
}
pub fn load_session_full(
&self,
session_id: &str,
) -> Option<(Vec<StoredMessage>, Option<SummaryLine>)> {
let path = self.resolve_path(session_id)?;
let (_, messages, _, summary) = load_session_file(&path)?;
Some((messages, summary))
}
pub fn create_mcp_session(&self, namespace: &str, project: &str) -> anyhow::Result<String> {
let session_id = Uuid::now_v7().to_string();
let path = self.path_for_new(&session_id, namespace);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let meta = SessionMeta {
session_id: session_id.clone(),
room_id: String::new(),
thread_id: None,
channel: "mcp".to_string(),
created_at: Utc::now(),
public_id: None,
namespace: Some(namespace.to_string()),
project: Some(project.to_string()),
device_id: None,
room_profile: None,
title: None,
};
let line = serde_json::to_string(&MetaLine { meta })?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(session_id)
}
pub fn append_report(
&self,
session_id: &str,
rendered_text: &str,
meta: ReportMeta,
) -> anyhow::Result<()> {
let stored = StoredMessage {
timestamp: Utc::now(),
role: Role::User,
parts: vec![ContentPart::Text(rendered_text.to_string())],
input_kind: None,
user_id: None,
report_meta: Some(meta),
};
let line = serde_json::to_string(&stored)?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
pub fn ensure_session(
&self,
session_id: &str,
key: &ConversationKey,
channel: &str,
public_id_override: Option<String>,
namespace: &str,
) -> anyhow::Result<Option<String>> {
if let Some(existing) = self.resolve_path(session_id) {
let pub_id = load_session_file(&existing).and_then(|(meta, _, _, _)| meta.public_id);
return Ok(pub_id);
}
let path = self.path_for_new(session_id, namespace);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let public_id = if channel == "rpc" {
Some(public_id_override.unwrap_or_else(|| grain_id::GrainId::random().to_string()))
} else {
None
};
let meta = SessionMeta {
session_id: session_id.to_string(),
room_id: key.0.clone(),
thread_id: key.1.clone(),
channel: channel.to_string(),
created_at: Utc::now(),
public_id: public_id.clone(),
namespace: Some(namespace.to_string()),
project: None,
device_id: None,
room_profile: None,
title: None,
};
let line = serde_json::to_string(&MetaLine { meta })?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(public_id)
}
pub fn find_or_create_for_device(
&self,
device_id: &str,
room_profile: &str,
namespace: &str,
boundary_hour: u8,
) -> anyhow::Result<String> {
let today = local_date_for_timestamp(Local::now(), boundary_hour);
let (today_start, today_end) = day_window(today, boundary_hour);
let mut best: Option<(DateTime<Utc>, String)> = None;
for path in collect_session_files(&self.base_dir, self.kind) {
let Some((meta, _, is_closed, _)) = load_session_file(&path) else {
continue;
};
if is_closed {
continue;
}
if meta.namespace.as_deref() != Some(namespace) {
continue;
}
if meta.device_id.as_deref() != Some(device_id) {
continue;
}
if meta.room_profile.as_deref() != Some(room_profile) {
continue;
}
if meta.created_at < today_start || meta.created_at >= today_end {
continue;
}
match &best {
Some((ts, _)) if *ts >= meta.created_at => {}
_ => best = Some((meta.created_at, meta.session_id.clone())),
}
}
if let Some((_, id)) = best {
let _ = self.resolve_path(&id);
return Ok(id);
}
let session_id = Uuid::now_v7().to_string();
let path = self.path_for_new(&session_id, namespace);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let meta = SessionMeta {
session_id: session_id.clone(),
room_id: String::new(),
thread_id: None,
channel: "device-default".to_string(),
created_at: Utc::now(),
public_id: None,
namespace: Some(namespace.to_string()),
project: None,
device_id: Some(device_id.to_string()),
room_profile: Some(room_profile.to_string()),
title: None,
};
let line = serde_json::to_string(&MetaLine { meta })?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(session_id)
}
pub fn set_title(&self, session_id: &str, title: &str) -> anyhow::Result<()> {
let line = serde_json::to_string(&TitleLine {
session_title: title.to_string(),
})?;
let path = self
.resolve_path(session_id)
.ok_or_else(|| anyhow::anyhow!("Session file not found for {session_id}"))?;
let mut file = OpenOptions::new().create(true).append(true).open(&path)?;
writeln!(file, "{line}")?;
drop(file);
self.notify_updated(&path);
Ok(())
}
pub fn find_by_public_id(&self, public_id: &str) -> Option<String> {
for path in collect_session_files(&self.base_dir, self.kind) {
if let Some((meta, _, _, _)) = load_session_file(&path)
&& meta.public_id.as_deref() == Some(public_id)
{
return Some(meta.session_id);
}
}
None
}
pub fn sessions_for_day(
&self,
date: NaiveDate,
boundary_hour: u8,
) -> Vec<(SessionMeta, Vec<StoredMessage>)> {
let (day_start, day_end) = day_window(date, boundary_hour);
let mut results = Vec::new();
for path in collect_session_files(&self.base_dir, self.kind) {
if let Ok(meta_fs) = path.metadata()
&& let Ok(mtime) = meta_fs.modified()
{
let mtime_utc: DateTime<Utc> = mtime.into();
if mtime_utc < day_start - Duration::days(1) {
continue;
}
}
if let Some((meta, messages, _, _)) = load_session_file(&path) {
let day_messages: Vec<StoredMessage> = messages
.into_iter()
.filter(|m| m.timestamp >= day_start && m.timestamp < day_end)
.collect();
if !day_messages.is_empty() {
results.push((meta, day_messages));
}
}
}
results.sort_by_key(|(meta, _)| meta.created_at);
results
}
pub fn sessions_for_day_filtered<F>(
&self,
date: NaiveDate,
boundary_hour: u8,
predicate: F,
) -> Vec<(SessionMeta, Vec<StoredMessage>)>
where
F: Fn(&SessionMeta) -> bool,
{
self.sessions_for_day(date, boundary_hour)
.into_iter()
.filter(|(meta, _)| predicate(meta))
.collect()
}
pub fn all_session_dates_filtered<F>(&self, boundary_hour: u8, predicate: F) -> Vec<NaiveDate>
where
F: Fn(&SessionMeta) -> bool,
{
let mut dates = std::collections::HashSet::new();
for path in collect_session_files(&self.base_dir, self.kind) {
if let Some((meta, messages, _, _)) = load_session_file(&path) {
if !predicate(&meta) {
continue;
}
for msg in messages {
let local_ts = msg.timestamp.with_timezone(&Local);
let date = local_date_for_timestamp(local_ts, boundary_hour);
dates.insert(date);
}
}
}
let mut sorted: Vec<NaiveDate> = dates.into_iter().collect();
sorted.sort();
sorted
}
#[allow(dead_code)]
pub fn all_session_dates(&self, boundary_hour: u8) -> Vec<NaiveDate> {
let mut dates = std::collections::HashSet::new();
for path in collect_session_files(&self.base_dir, self.kind) {
if let Some((_, messages, _, _)) = load_session_file(&path) {
for msg in messages {
let local_ts = msg.timestamp.with_timezone(&Local);
let date = local_date_for_timestamp(local_ts, boundary_hour);
dates.insert(date);
}
}
}
let mut sorted: Vec<NaiveDate> = dates.into_iter().collect();
sorted.sort();
sorted
}
}
pub(crate) fn scrub_images_for_storage(msg: &ChatMessage) -> Option<ChatMessage> {
if !msg
.parts
.iter()
.any(|p| matches!(p, ContentPart::Image { .. }))
{
return None;
}
let parts = msg
.parts
.iter()
.map(|p| match p {
ContentPart::Image {
media_type,
data_base64,
} => {
let hash = match BASE64_STANDARD.decode(data_base64) {
Ok(bytes) => sha256_hex(&bytes),
Err(_) => "invalid-base64".to_string(),
};
ContentPart::Text(format!("[image: {media_type} sha256={hash}]"))
}
other => other.clone(),
})
.collect();
Some(ChatMessage {
role: msg.role.clone(),
parts,
input_kind: msg.input_kind.clone(),
user_id: msg.user_id.clone(),
})
}
fn sha256_hex(bytes: &[u8]) -> String {
use std::fmt::Write;
let mut h = Sha256::new();
h.update(bytes);
let digest = h.finalize();
let mut s = String::with_capacity(64);
for b in digest.iter() {
let _ = write!(&mut s, "{b:02x}");
}
s
}
fn collect_session_files(base_dir: &Path, kind: &str) -> Vec<PathBuf> {
let mut out = Vec::new();
let Ok(entries) = fs::read_dir(base_dir) else {
return out;
};
for entry in entries.flatten() {
let ns_dir = entry.path();
if !ns_dir.is_dir() {
continue;
}
let kind_dir = ns_dir.join(kind);
let Ok(kind_entries) = fs::read_dir(&kind_dir) else {
continue;
};
for k_entry in kind_entries.flatten() {
let path = k_entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
out.push(path);
}
}
}
out
}
fn day_window(date: NaiveDate, boundary_hour: u8) -> (DateTime<Utc>, DateTime<Utc>) {
let start_local = date
.and_hms_opt(boundary_hour as u32, 0, 0)
.expect("valid time");
let end_local = (date + Duration::days(1))
.and_hms_opt(boundary_hour as u32, 0, 0)
.expect("valid time");
let start_utc = Local
.from_local_datetime(&start_local)
.single()
.unwrap_or_else(|| Local.from_local_datetime(&start_local).earliest().unwrap())
.with_timezone(&Utc);
let end_utc = Local
.from_local_datetime(&end_local)
.single()
.unwrap_or_else(|| Local.from_local_datetime(&end_local).earliest().unwrap())
.with_timezone(&Utc);
(start_utc, end_utc)
}
pub fn local_date_for_timestamp(local_ts: DateTime<Local>, boundary_hour: u8) -> NaiveDate {
let date = local_ts.date_naive();
if local_ts.hour() < boundary_hour as u32 {
date - Duration::days(1)
} else {
date
}
}
fn load_session_file(
path: &Path,
) -> Option<(SessionMeta, Vec<StoredMessage>, bool, Option<SummaryLine>)> {
let file = fs::File::open(path).ok()?;
let mut lines = BufReader::new(file).lines();
let first = lines.next()?.ok()?;
let meta_line: MetaLine = serde_json::from_str(first.trim()).ok()?;
let mut meta = meta_line.meta;
let mut messages = Vec::new();
let mut is_closed = false;
let mut latest_summary: Option<SummaryLine> = None;
for raw in lines.map_while(Result::ok) {
let raw = raw.trim().to_string();
if raw.is_empty() {
continue;
}
let value: serde_json::Value = match serde_json::from_str(&raw) {
Ok(v) => v,
Err(e) => {
warn!("Skipping unparseable line in {}: {e}", path.display());
continue;
}
};
if value.get("closed_at").is_some() {
is_closed = true;
} else if let Some(title) = value.get("session_title").and_then(|v| v.as_str()) {
meta.title = Some(title.to_string());
} else if value.get("summary_at").is_some() {
match serde_json::from_value::<SummaryLine>(value) {
Ok(s) => latest_summary = Some(s),
Err(e) => {
warn!("Skipping malformed summary in {}: {e}", path.display());
}
}
} else if value.get("digest_at").is_some() {
continue;
} else if value.get("timestamp").is_some() {
match serde_json::from_value::<StoredMessage>(value) {
Ok(stored) => messages.push(stored),
Err(e) => {
warn!("Skipping malformed message in {}: {e}", path.display());
}
}
}
}
Some((meta, messages, is_closed, latest_summary))
}
fn load_meta_and_latest_intraday_digest(
path: &Path,
) -> Option<(SessionMeta, Option<IntradayDigestLine>)> {
let file = fs::File::open(path).ok()?;
let mut lines = BufReader::new(file).lines();
let first = lines.next()?.ok()?;
let meta_line: MetaLine = serde_json::from_str(first.trim()).ok()?;
let meta = meta_line.meta;
let mut latest: Option<IntradayDigestLine> = None;
for raw in lines.map_while(Result::ok) {
let raw = raw.trim();
if raw.is_empty() {
continue;
}
let value: serde_json::Value = match serde_json::from_str(raw) {
Ok(v) => v,
Err(_) => continue,
};
if value.get("digest_at").is_some()
&& let Ok(d) = serde_json::from_value::<IntradayDigestLine>(value)
{
latest = Some(d);
}
}
Some((meta, latest))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scrub_returns_none_when_no_images() {
let msg = ChatMessage::user("plain text");
assert!(scrub_images_for_storage(&msg).is_none());
}
#[test]
fn scrub_replaces_image_with_hash_marker() {
let bytes = b"\xff\xd8\xff\xe0fake-jpeg".to_vec();
let b64 = BASE64_STANDARD.encode(&bytes);
let msg =
ChatMessage::user_with_images("look", std::iter::once(("image/jpeg".to_string(), b64)));
let scrubbed = scrub_images_for_storage(&msg).expect("scrub should rewrite");
assert!(
!scrubbed
.parts
.iter()
.any(|p| matches!(p, ContentPart::Image { .. })),
"scrubbed message still contains Image part"
);
let expected = sha256_hex(&bytes);
let has_marker = scrubbed
.parts
.iter()
.any(|p| matches!(p, ContentPart::Text(s) if s.contains(&expected) && s.contains("image/jpeg")));
assert!(
has_marker,
"missing hash marker; parts={:?}",
scrubbed.parts
);
}
#[test]
fn scrub_invalid_base64_records_marker_without_panic() {
let msg = ChatMessage {
role: Role::User,
parts: vec![ContentPart::Image {
media_type: "image/png".to_string(),
data_base64: "@@@not-base64@@@".to_string(),
}],
input_kind: Some(UserInputKind::Text),
user_id: None,
};
let scrubbed = scrub_images_for_storage(&msg).expect("scrub should rewrite");
let has_marker = scrubbed
.parts
.iter()
.any(|p| matches!(p, ContentPart::Text(s) if s.contains("invalid-base64")));
assert!(has_marker, "expected invalid-base64 marker");
}
#[test]
fn scrub_passes_imageref_through_unchanged() {
let msg = ChatMessage {
role: Role::User,
parts: vec![ContentPart::ImageRef {
media_type: "image/jpeg".to_string(),
sha256: "abc123".to_string(),
}],
input_kind: Some(UserInputKind::Text),
user_id: None,
};
assert!(
scrub_images_for_storage(&msg).is_none(),
"scrub should leave ImageRef-only messages untouched"
);
}
#[test]
fn stored_message_without_input_kind_or_user_id_deserializes_as_none() {
let legacy = r#"{"timestamp":"2026-04-08T11:30:22.372570890Z","role":"user","parts":[{"Text":"hello"}]}"#;
let msg: StoredMessage = serde_json::from_str(legacy).expect("legacy JSONL parses");
assert!(msg.input_kind.is_none());
assert!(msg.user_id.is_none());
}
#[test]
fn stored_message_omits_none_fields_on_serialize() {
let msg = StoredMessage {
timestamp: Utc::now(),
role: Role::Assistant,
parts: vec![ContentPart::Text("hi".to_string())],
input_kind: None,
user_id: None,
report_meta: None,
};
let json = serde_json::to_string(&msg).unwrap();
assert!(
!json.contains("input_kind"),
"None input_kind must be omitted, got: {json}"
);
assert!(
!json.contains("user_id"),
"None user_id must be omitted, got: {json}"
);
assert!(
!json.contains("report_meta"),
"None report_meta must be omitted, got: {json}"
);
}
#[test]
fn stored_message_text_input_kind_round_trip() {
let original = StoredMessage {
timestamp: Utc::now(),
role: Role::User,
parts: vec![ContentPart::Text("hi".to_string())],
input_kind: Some(UserInputKind::Text),
user_id: Some("owner".to_string()),
report_meta: None,
};
let json = serde_json::to_string(&original).unwrap();
assert!(json.contains(r#""input_kind":{"kind":"text"}"#));
assert!(json.contains(r#""user_id":"owner""#));
let parsed: StoredMessage = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.input_kind, Some(UserInputKind::Text));
assert_eq!(parsed.user_id.as_deref(), Some("owner"));
}
#[test]
fn stored_message_voice_input_kind_round_trip() {
let original = StoredMessage {
timestamp: Utc::now(),
role: Role::User,
parts: vec![ContentPart::Text("hello there".to_string())],
input_kind: Some(UserInputKind::Voice),
user_id: None,
report_meta: None,
};
let json = serde_json::to_string(&original).unwrap();
assert!(
json.contains(r#""input_kind":{"kind":"voice"}"#),
"missing voice tag: {json}"
);
let parsed: StoredMessage = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.input_kind, Some(UserInputKind::Voice));
}
#[test]
fn from_chat_and_into_chat_preserve_input_kind_and_user_id() {
let chat = ChatMessage {
role: Role::User,
parts: vec![ContentPart::Text("hi".to_string())],
input_kind: Some(UserInputKind::Voice),
user_id: Some("alice".to_string()),
};
let stored = StoredMessage::from_chat(&chat);
assert_eq!(stored.input_kind, Some(UserInputKind::Voice));
assert_eq!(stored.user_id.as_deref(), Some("alice"));
let round = stored.into_chat_message();
assert_eq!(round.input_kind, Some(UserInputKind::Voice));
assert_eq!(round.user_id.as_deref(), Some("alice"));
}
fn new_device_default_store() -> (tempfile::TempDir, SessionStore) {
let tmp = tempfile::TempDir::new().unwrap();
let store = SessionStore::new(tmp.path().to_path_buf(), "device-default");
(tmp, store)
}
#[test]
fn find_or_create_for_device_creates_new_session() {
let (_tmp, store) = new_device_default_store();
let sid = store
.find_or_create_for_device("device-a", "default", "default", 4)
.expect("find_or_create");
let path = store.absolute_path_for(&sid).expect("path cached");
let (meta, _msgs, is_closed, _summary) =
load_session_file(&path).expect("meta line present");
assert_eq!(meta.session_id, sid);
assert_eq!(meta.channel, "device-default");
assert_eq!(meta.device_id.as_deref(), Some("device-a"));
assert_eq!(meta.room_profile.as_deref(), Some("default"));
assert_eq!(meta.namespace.as_deref(), Some("default"));
assert!(meta.public_id.is_none(), "device-default has no grain-id");
assert!(!is_closed);
}
#[test]
fn find_or_create_for_device_is_idempotent_within_day() {
let (_tmp, store) = new_device_default_store();
let first = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
let second = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
assert_eq!(first, second);
}
#[test]
fn find_or_create_for_device_distinguishes_devices() {
let (_tmp, store) = new_device_default_store();
let a = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
let b = store
.find_or_create_for_device("device-b", "default", "default", 4)
.unwrap();
assert_ne!(a, b);
}
#[test]
fn find_or_create_for_device_distinguishes_room_profiles() {
let (_tmp, store) = new_device_default_store();
let sfw = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
let nsfw = store
.find_or_create_for_device("device-a", "private_nsfw", "user_nsfw", 4)
.unwrap();
assert_ne!(sfw, nsfw);
}
#[test]
fn find_or_create_for_device_skips_closed_sessions() {
let (_tmp, store) = new_device_default_store();
let first = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
store.close_session(&first).expect("close");
let second = store
.find_or_create_for_device("device-a", "default", "default", 4)
.unwrap();
assert_ne!(
first, second,
"closed session must not be reused — boundary rotation depends on this"
);
}
#[test]
fn find_or_create_for_device_skips_yesterday_session() {
let (tmp, store) = new_device_default_store();
let boundary = 4u8;
let yesterday_date = local_date_for_timestamp(Local::now(), boundary) - Duration::days(1);
let (yesterday_start, _) = day_window(yesterday_date, boundary);
let stale_id = Uuid::now_v7().to_string();
let stale_dir = tmp.path().join("default").join("device-default");
fs::create_dir_all(&stale_dir).unwrap();
let stale_path = stale_dir.join(format!("{stale_id}.jsonl"));
let stale_meta = SessionMeta {
session_id: stale_id.clone(),
room_id: String::new(),
thread_id: None,
channel: "device-default".to_string(),
created_at: yesterday_start + Duration::hours(2),
public_id: None,
namespace: Some("default".to_string()),
project: None,
device_id: Some("device-a".to_string()),
room_profile: Some("default".to_string()),
title: None,
};
let line = serde_json::to_string(&MetaLine { meta: stale_meta }).unwrap();
std::fs::write(&stale_path, format!("{line}\n")).unwrap();
let fresh = store
.find_or_create_for_device("device-a", "default", "default", boundary)
.unwrap();
assert_ne!(
stale_id, fresh,
"yesterday's session must not be picked up; daily rotation depends on it"
);
}
}