use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use parking_lot::Mutex;
use uuid::Uuid;
use super::inverted_index::InvertedIndex;
use super::types::{
ConversationMessage, ConversationMessagePatch, ConversationThread, CreateConversationThread,
};
#[path = "store_ops.rs"]
mod ops;
#[path = "store_index.rs"]
mod index;
pub(super) const THREADS_FILENAME: &str = "threads.jsonl";
pub(super) const THREAD_MESSAGES_DIR: &str = "threads";
static CONVERSATION_STORE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
static CONVERSATION_INDEX_CACHE: LazyLock<Mutex<HashMap<PathBuf, InvertedIndex>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
#[derive(Debug, Clone, Copy, Default)]
pub struct ConversationPurgeStats {
pub thread_count: usize,
pub message_count: usize,
}
#[derive(Debug, Clone)]
pub struct ConversationStore {
workspace_dir: PathBuf,
}
impl ConversationStore {
pub fn new(workspace_dir: PathBuf) -> Self {
Self { workspace_dir }
}
pub fn from_config(config: &crate::memory::config::MemoryConfig) -> Self {
Self::new(config.workspace.clone())
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
pub(super) enum ThreadLogEntry {
Upsert {
thread_id: String,
title: String,
created_at: String,
updated_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_thread_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
labels: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
personality_id: Option<String>,
},
Delete {
thread_id: String,
deleted_at: String,
},
MessageAppended {
thread_id: String,
last_message_at: String,
},
Stats {
thread_id: String,
message_count: usize,
last_message_at: String,
},
}
#[derive(Debug, Clone)]
pub(super) struct ThreadIndexEntry {
pub(super) title: String,
pub(super) created_at: String,
pub(super) parent_thread_id: Option<String>,
pub(super) labels: Vec<String>,
pub(super) message_count: Option<usize>,
pub(super) last_message_at: Option<String>,
pub(super) personality_id: Option<String>,
}
pub(super) fn infer_labels(thread_id: &str) -> Vec<String> {
if thread_id == "proactive:morning_briefing" {
vec!["briefing".to_string()]
} else if thread_id.starts_with("proactive:") {
vec!["notification".to_string()]
} else {
vec!["general".to_string()]
}
}
pub(super) fn normalize_labels(labels: Vec<String>) -> Vec<String> {
let mut normalized = Vec::with_capacity(labels.len());
for label in labels {
let next = match label.as_str() {
"work" => "general".to_string(),
"from_reflection" | "subconscious_tick" => "subconscious".to_string(),
"agent-task" | "worker" => "tasks".to_string(),
_ => label,
};
if !normalized.contains(&next) {
normalized.push(next);
}
}
normalized
}
pub(super) fn hex_encode(bytes: &[u8]) -> String {
const HEX: [u8; 16] = *b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for &b in bytes {
out.push(HEX[(b >> 4) as usize] as char);
out.push(HEX[(b & 0x0f) as usize] as char);
}
out
}
pub(super) fn read_jsonl<T>(path: &Path) -> Result<Vec<T>, String>
where
T: for<'de> serde::Deserialize<'de>,
{
if !path.exists() {
return Ok(Vec::new());
}
let file = File::open(path).map_err(|e| format!("open {}: {e}", path.display()))?;
let reader = BufReader::new(file);
let mut items = Vec::new();
for (line_no, line) in reader.lines().enumerate() {
let line =
line.map_err(|e| format!("read {} line {}: {e}", path.display(), line_no + 1))?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<T>(trimmed) {
items.push(value);
}
}
Ok(items)
}
pub(super) fn append_jsonl<T>(path: &Path, value: &T) -> Result<(), String>
where
T: serde::Serialize,
{
let parent = path
.parent()
.ok_or_else(|| format!("resolve parent dir for {}", path.display()))?;
fs::create_dir_all(parent)
.map_err(|e| format!("create jsonl dir {}: {e}", parent.display()))?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.map_err(|e| format!("open {} for append: {e}", path.display()))?;
let line = serde_json::to_string(value)
.map_err(|e| format!("serialize jsonl line for {}: {e}", path.display()))?;
writeln!(file, "{line}").map_err(|e| format!("write {}: {e}", path.display()))?;
file.sync_all()
.map_err(|e| format!("sync {}: {e}", path.display()))?;
Ok(())
}
pub(super) fn rewrite_jsonl<T>(path: &Path, values: &[T]) -> Result<(), String>
where
T: serde::Serialize,
{
let parent = path
.parent()
.ok_or_else(|| format!("resolve parent dir for {}", path.display()))?;
fs::create_dir_all(parent)
.map_err(|e| format!("create jsonl dir {}: {e}", parent.display()))?;
let tmp_path = parent.join(format!(".conversations-{}.tmp", Uuid::new_v4()));
let write_result = (|| -> Result<(), String> {
let mut temp = File::create(&tmp_path)
.map_err(|e| format!("create temp jsonl in {}: {e}", parent.display()))?;
for value in values {
let line = serde_json::to_string(value)
.map_err(|e| format!("serialize jsonl line for {}: {e}", path.display()))?;
writeln!(temp, "{line}")
.map_err(|e| format!("write temp jsonl for {}: {e}", path.display()))?;
}
temp.sync_all()
.map_err(|e| format!("sync temp jsonl for {}: {e}", path.display()))?;
Ok(())
})();
if let Err(error) = write_result {
let _ = fs::remove_file(&tmp_path);
return Err(error);
}
if let Err(error) = fs::rename(&tmp_path, path) {
let _ = fs::remove_file(&tmp_path);
return Err(format!("persist {}: {error}", path.display()));
}
Ok(())
}
pub fn ensure_thread(
workspace_dir: PathBuf,
request: CreateConversationThread,
) -> Result<ConversationThread, String> {
ConversationStore::new(workspace_dir).ensure_thread(request)
}
pub fn list_threads(workspace_dir: PathBuf) -> Result<Vec<ConversationThread>, String> {
ConversationStore::new(workspace_dir).list_threads()
}
pub fn get_messages(
workspace_dir: PathBuf,
thread_id: &str,
) -> Result<Vec<ConversationMessage>, String> {
ConversationStore::new(workspace_dir).get_messages(thread_id)
}
pub fn append_message(
workspace_dir: PathBuf,
thread_id: &str,
message: ConversationMessage,
) -> Result<ConversationMessage, String> {
ConversationStore::new(workspace_dir).append_message(thread_id, message)
}
pub fn update_thread_title(
workspace_dir: PathBuf,
thread_id: &str,
title: &str,
updated_at: &str,
) -> Result<ConversationThread, String> {
ConversationStore::new(workspace_dir).update_thread_title(thread_id, title, updated_at)
}
pub fn update_thread_labels(
workspace_dir: PathBuf,
thread_id: &str,
labels: Vec<String>,
updated_at: &str,
) -> Result<ConversationThread, String> {
ConversationStore::new(workspace_dir).update_thread_labels(thread_id, labels, updated_at)
}
pub fn update_message(
workspace_dir: PathBuf,
thread_id: &str,
message_id: &str,
patch: ConversationMessagePatch,
) -> Result<ConversationMessage, String> {
ConversationStore::new(workspace_dir).update_message(thread_id, message_id, patch)
}
pub fn purge_threads(workspace_dir: PathBuf) -> Result<ConversationPurgeStats, String> {
ConversationStore::new(workspace_dir).purge_threads()
}
pub fn delete_thread(
workspace_dir: PathBuf,
thread_id: &str,
deleted_at: &str,
) -> Result<bool, String> {
ConversationStore::new(workspace_dir).delete_thread(thread_id, deleted_at)
}
#[cfg(test)]
#[path = "store_tests.rs"]
mod tests;