mod agent;
mod channel;
mod config;
mod context_compression;
mod frontmatter;
mod heartbeat;
mod heartbeat_config;
mod image_cache;
mod mcp_client;
mod memory_compaction;
mod periodic_log;
mod provider;
mod serve;
mod session;
mod timer;
mod tools;
mod voice;
mod workspace;
use agent::Agent;
use anyhow::{Context, Result};
use channel::discord::DiscordChannel;
use channel::matrix::MatrixChannel;
use clap::{Parser, Subcommand};
use config::Config;
use heartbeat::Heartbeat;
use periodic_log::{
build_all_today_digests, catchup_missing_daily_digests, catchup_pending_daily_logs,
catchup_pending_monthly_logs, catchup_pending_weekly_logs, catchup_pending_yearly_logs,
};
use provider::registry::ProviderRegistry;
use sapphire_workspace::{AppContext, DeviceDefaults, Workspace as SwWorkspace, WorkspaceState};
static APP_CTX: AppContext = AppContext::new("sapphire-agent").allow_external_paths();
fn init_app_ctx() {
let base = directories::BaseDirs::new();
let cache_dir = base
.as_ref()
.map(|b| b.cache_dir().to_path_buf())
.unwrap_or_else(std::env::temp_dir)
.join(env!("CARGO_PKG_NAME"));
let data_dir = base
.as_ref()
.map(|b| b.data_dir().to_path_buf())
.unwrap_or_else(std::env::temp_dir)
.join(env!("CARGO_PKG_NAME"));
APP_CTX.set_cache_dir(cache_dir);
APP_CTX.set_data_dir(data_dir);
let hostname = hostname::get()
.ok()
.and_then(|s| s.into_string().ok())
.unwrap_or_default();
APP_CTX.set_device_defaults(DeviceDefaults {
hostname,
app_id: env!("CARGO_PKG_NAME").to_owned(),
app_version: env!("CARGO_PKG_VERSION").to_owned(),
platform: std::env::consts::OS.to_owned(),
arch: std::env::consts::ARCH.to_owned(),
});
}
use session::SessionStore;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing_subscriber::{EnvFilter, fmt};
use workspace::Workspace;
#[derive(Parser)]
#[command(
name = "sapphire-agent",
about = "Personal AI assistant — Anthropic + Matrix/Discord"
)]
struct Cli {
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
#[arg(long, value_name = "ADDR")]
bind: Option<String>,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(Subcommand)]
enum Command {
Verify,
}
#[tokio::main]
async fn main() -> Result<()> {
fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.init();
init_app_ctx();
let cli = Cli::parse();
let config_path = cli.config.unwrap_or_else(Config::default_path);
let config = Config::load(&config_path)
.with_context(|| format!("Failed to load config from {}", config_path.display()))?;
match cli.command {
Some(Command::Verify) => {
let workspace_dir = config.resolved_workspace_dir(&config_path);
println!("Config OK");
if let Some(m) = &config.matrix {
println!(" Channel : matrix");
println!(" Matrix homeserver : {}", m.homeserver);
println!(" Matrix user_id : {}", m.user_id);
println!(" Matrix rooms : {:?}", m.room_ids);
} else if let Some(d) = &config.discord {
println!(" Channel : discord");
println!(" Discord channels : {:?}", d.channel_ids);
println!(" Allowed users : {:?}", d.allowed_users);
} else {
println!(" Channel : NONE (add [discord] or [matrix] to config)");
}
println!(" Anthropic model : {}", config.anthropic.model);
println!(" Anthropic max_tok : {}", config.anthropic.max_tokens);
println!(" Workspace dir : {}", workspace_dir.display());
println!(
" Day boundary hour : {}:00 local",
config.day_boundary_hour
);
println!(" Heartbeat enabled : {}", config.heartbeat_enabled);
println!(" Standby mode : {}", config.standby_mode);
println!();
let workspace_files = [
("AGENTS.md / AGENT.md", vec!["AGENTS.md", "AGENT.md"]),
("SOUL.md", vec!["SOUL.md"]),
("IDENTITY.md", vec!["IDENTITY.md"]),
("USER.md", vec!["USER.md"]),
("TOOLS.md", vec!["TOOLS.md"]),
("BOOTSTRAP.md", vec!["BOOTSTRAP.md"]),
("MEMORY.md", vec!["MEMORY.md", "memory.md"]),
];
for (label, candidates) in &workspace_files {
let found = candidates.iter().find_map(|f| {
let p = workspace_dir.join(f);
if p.exists() { Some(*f) } else { None }
});
match found {
Some(f) => println!(" {label:<28} found ({f})"),
None => println!(" {label:<28} -"),
}
}
}
None => {
let bind = cli.bind;
let workspace_dir = config.resolved_workspace_dir(&config_path);
if let Err(e) = migrate_pre_namespace_layout(&workspace_dir) {
anyhow::bail!("Memory layout migration failed: {e:#}");
}
let sessions_base_for_migration = config.resolved_sessions_dir(&workspace_dir);
if let Err(e) = migrate_per_channel_sessions(&sessions_base_for_migration) {
anyhow::bail!("Session layout migration failed: {e:#}");
}
if let Err(e) = migrate_sessions_to_namespaced_layout(&sessions_base_for_migration) {
anyhow::bail!("Session namespace migration failed: {e:#}");
}
if let Err(e) = migrate_to_device_default_layout(&sessions_base_for_migration) {
anyhow::bail!("Device-default session migration failed: {e:#}");
}
let workspace = Arc::new(Workspace::new(workspace_dir.clone(), config.digest.clone()));
let sw_workspace = SwWorkspace::resolve(&APP_CTX, Some(&workspace_dir))
.context("Failed to resolve sapphire-workspace")?;
let sync_config = config.sync.clone().unwrap_or_default();
let ws_sync_interval = config
.sync_interval_minutes
.filter(|&m| m > 0)
.map(|m| std::time::Duration::from_secs(m as u64 * 60));
let ws_state = WorkspaceState::open_configured(sw_workspace, &sync_config)
.context("Failed to open WorkspaceState")?;
if let Err(e) = ws_state.periodic_sync() {
tracing::warn!("Initial workspace sync failed: {e}");
}
let ws_state = Arc::new(Mutex::new(ws_state));
if config.standby_mode
&& let Some(dur) = ws_sync_interval
{
tracing::info!("Periodic workspace sync enabled: every {}s", dur.as_secs());
let ws = Arc::clone(&ws_state);
tokio::spawn(async move {
let mut tick = tokio::time::interval(dur);
tick.tick().await;
loop {
tick.tick().await;
let state = ws.lock().expect("ws_state mutex poisoned");
match state.periodic_sync() {
Ok((u, r)) => {
tracing::info!("Periodic ws sync: {u} upserted, {r} removed");
}
Err(e) => tracing::warn!("Periodic ws sync failed: {e:#}"),
}
}
});
}
let timer_manager = timer::TimerManager::new();
let tool_set = tools::default_tool_set(
Arc::clone(&ws_state),
config.tools.tavily_api_key.clone(),
&config.tools.mcp_servers,
Arc::clone(&timer_manager),
config.timer.presets.clone(),
)
.await;
let sessions_base = config.resolved_sessions_dir(&workspace_dir);
let registry = Arc::new(
ProviderRegistry::from_config(&config)
.context("Failed to build provider registry")?,
);
let cross_device_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.clone(),
"cross-device",
Arc::clone(&ws_state),
));
let device_default_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.clone(),
"device-default",
Arc::clone(&ws_state),
));
let mcp_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.clone(),
"mcp",
Arc::clone(&ws_state),
));
let voice_providers = if config.stt_providers.is_empty()
&& config.tts_providers.is_empty()
|| config.standby_mode
{
None
} else {
let cfg = config.clone();
let providers =
tokio::task::spawn_blocking(move || voice::VoiceProviders::from_config(&cfg))
.await
.map_err(|e| anyhow::anyhow!("voice provider init panicked: {e}"))??;
Some(Arc::new(providers))
};
let image_cache: Option<Arc<image_cache::ImageCache>> = if config.image_cache.enabled {
let resolved = config
.image_cache
.dir
.clone()
.or_else(image_cache::ImageCache::default_dir);
match resolved {
Some(dir) => match image_cache::ImageCache::open(dir.clone()) {
Ok(cache) => {
tracing::info!("Image cache opened at {dir:?}");
Some(cache)
}
Err(e) => {
tracing::warn!(
"Image cache open failed ({e:?}); falling back to text-marker only"
);
None
}
},
None => {
tracing::warn!(
"No platform cache dir resolvable; image cache disabled (set [image_cache] dir = \"...\" to override)"
);
None
}
}
} else {
None
};
if let Some(cache) = image_cache.clone() {
tool_set
.register_tool(Box::new(tools::builtin_tools::RecallImageTool::new(cache)))
.await;
}
let serve_state = Arc::new(serve::ServeState::new(
config.clone(),
Arc::clone(®istry),
Arc::clone(&workspace),
Arc::clone(&tool_set),
Arc::clone(&cross_device_session_store),
Arc::clone(&device_default_session_store),
Arc::clone(&mcp_session_store),
voice_providers,
image_cache.clone(),
));
timer_manager.set_serve_state(Arc::downgrade(&serve_state));
if config.standby_mode {
tracing::info!(
"Standby mode enabled: git sync only, skipping channel and heartbeat"
);
}
let mut agent_handle: Option<tokio::task::JoinHandle<()>> = None;
if !config.standby_mode && (config.matrix.is_some() || config.discord.is_some()) {
let channel_session_store = Arc::new(SessionStore::with_workspace(
sessions_base.clone(),
"channel",
Arc::clone(&ws_state),
));
let mut channel_list: Vec<(String, Arc<dyn channel::Channel>)> = Vec::new();
if let Some(m) = &config.matrix {
channel_list.push(("matrix".to_string(), Arc::new(MatrixChannel::new(m))));
}
if let Some(d) = &config.discord {
channel_list.push((
"discord".to_string(),
Arc::new(
DiscordChannel::new(d)
.context("Failed to initialise Discord channel")?,
),
));
}
let channels = Arc::new(channel::Channels::new(
channel_list,
channel::seed_routing_from_config(&config),
));
tracing::info!("Channels active: {}", channels.names().join(", "));
let today_local = session::local_date_for_timestamp(
chrono::Local::now(),
config.day_boundary_hour,
);
for ns in config.all_memory_namespaces() {
let provider = registry.background_provider_for_namespace(&config, &ns);
let cfg_for_predicate = config.clone();
let ns_for_predicate = ns.clone();
let predicate = move |meta: &session::SessionMeta| -> bool {
if meta.channel == "rpc" {
return ns_for_predicate == config::DEFAULT_NAMESPACE_NAME;
}
cfg_for_predicate.namespace_for_room(&meta.room_id) == ns_for_predicate
};
catchup_pending_daily_logs(
&channel_session_store,
provider.as_ref(),
&ws_state,
&workspace_dir,
&ns,
config.day_boundary_hour,
&predicate,
)
.await;
catchup_missing_daily_digests(
provider.as_ref(),
&ws_state,
&workspace_dir,
&ns,
)
.await;
if config.digest.weekly_enabled {
catchup_pending_weekly_logs(
provider.as_ref(),
&ws_state,
&workspace_dir,
&ns,
today_local,
)
.await;
}
if config.digest.monthly_enabled {
catchup_pending_monthly_logs(
provider.as_ref(),
&ws_state,
&workspace_dir,
&ns,
today_local,
)
.await;
}
if config.digest.yearly_enabled {
catchup_pending_yearly_logs(
provider.as_ref(),
&ws_state,
&workspace_dir,
&ns,
today_local,
)
.await;
}
}
let agent = Arc::new(Agent::new(
config.clone(),
Arc::clone(&channels),
Arc::clone(®istry),
Arc::clone(&workspace),
Some(Arc::clone(&tool_set)),
Arc::clone(&channel_session_store),
image_cache.clone(),
));
agent.bootstrap().await;
timer_manager.set_agent(Arc::downgrade(&agent));
if let Some(dur) = ws_sync_interval {
tracing::info!("Periodic workspace sync enabled: every {}s", dur.as_secs());
let ws = Arc::clone(&ws_state);
let cfg_for_loop = config.clone();
let workspace_for_loop = Arc::clone(&workspace);
let workspace_dir_for_loop = workspace_dir.clone();
let channel_store_for_loop = Arc::clone(&channel_session_store);
let cross_device_store_for_loop = Arc::clone(&cross_device_session_store);
let device_default_store_for_loop = Arc::clone(&device_default_session_store);
let agent_for_loop = Arc::clone(&agent);
tokio::spawn(async move {
let mut tick = tokio::time::interval(dur);
tick.tick().await; loop {
tick.tick().await;
{
let state = ws.lock().expect("ws_state mutex poisoned");
match state.periodic_sync() {
Ok((u, r)) => tracing::info!(
"Periodic ws sync: {u} upserted, {r} removed"
),
Err(e) => tracing::warn!("Periodic ws sync failed: {e:#}"),
}
}
rebuild_today_digests(
&cfg_for_loop,
&workspace_for_loop,
&workspace_dir_for_loop,
&channel_store_for_loop,
&cross_device_store_for_loop,
&device_default_store_for_loop,
&agent_for_loop,
)
.await;
}
});
} else {
rebuild_today_digests(
&config,
&workspace,
&workspace_dir,
&channel_session_store,
&cross_device_session_store,
&device_default_session_store,
&agent,
)
.await;
}
let default_room_id = config
.matrix
.as_ref()
.and_then(|m| m.primary_room_id().map(str::to_string))
.or_else(|| {
config
.discord
.as_ref()
.and_then(|d| d.channel_ids.first().cloned())
});
let heartbeat = Heartbeat {
workspace_dir: workspace_dir.clone(),
ws_state: Arc::clone(&ws_state),
day_boundary_hour: config.day_boundary_hour,
daily_log_enabled: config.daily_log_enabled,
memory_compaction_enabled: config.memory_compaction_enabled,
digest_cfg: config.digest.clone(),
session_store: Arc::clone(&channel_session_store),
registry: Arc::clone(®istry),
agent: Arc::clone(&agent),
default_room_id,
config: config.clone(),
serve_state: Some(Arc::clone(&serve_state)),
};
if config.heartbeat_enabled {
heartbeat.spawn();
} else {
tracing::info!("Heartbeat disabled by config");
}
let agent_run = Arc::clone(&agent);
agent_handle = Some(tokio::spawn(async move {
if let Err(e) = agent_run.run().await {
tracing::error!("Agent error: {e:#}");
}
}));
}
if config.standby_mode {
tracing::info!("Standby mode: waiting for shutdown signal (Ctrl-C)");
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl-C");
tracing::info!("Shutting down standby process");
} else {
let addr = bind
.or_else(|| {
config
.serve
.as_ref()
.map(|s| format!("{}:{}", s.host, s.port))
})
.unwrap_or_else(|| "127.0.0.1:9000".to_string());
serve::run(addr, Arc::clone(&serve_state)).await?;
}
if let Some(handle) = agent_handle
&& let Err(e) = handle.await
{
tracing::warn!("Agent task did not finish cleanly: {e}");
}
}
}
Ok(())
}
async fn rebuild_today_digests(
config: &Config,
workspace: &Arc<Workspace>,
workspace_dir: &std::path::Path,
channel_store: &Arc<SessionStore>,
cross_device_store: &Arc<SessionStore>,
device_default_store: &Arc<SessionStore>,
agent: &Arc<Agent>,
) {
let today = session::local_date_for_timestamp(chrono::Local::now(), config.day_boundary_hour);
let namespaces = config.all_memory_namespaces();
let cfg = config.clone();
let map = build_all_today_digests(
&namespaces,
today,
config.day_boundary_hour,
channel_store,
Some(cross_device_store.as_ref()),
Some(device_default_store.as_ref()),
|room_id: &str| cfg.namespace_for_room(room_id).to_string(),
);
let had_content = !map.is_empty();
workspace.replace_today_digests(map).await;
let _ = workspace_dir; if had_content {
agent.invalidate_system_prompts().await;
}
}
fn migrate_sessions_to_namespaced_layout(sessions_base: &std::path::Path) -> anyhow::Result<()> {
use std::io::{BufRead, BufReader};
for kind in ["channel", "api", "rpc"] {
let kind_dir = sessions_base.join(kind);
if !kind_dir.is_dir() {
continue;
}
let entries = match std::fs::read_dir(&kind_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Skipping {}: {e}", kind_dir.display());
continue;
}
};
let mut moved = 0;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
let namespace = match std::fs::File::open(&path) {
Ok(f) => {
let mut first = String::new();
let _ = BufReader::new(f).read_line(&mut first);
serde_json::from_str::<serde_json::Value>(first.trim())
.ok()
.and_then(|v| {
v.get("meta")
.and_then(|m| m.get("namespace"))
.and_then(|n| n.as_str())
.map(str::to_string)
})
.unwrap_or_else(|| config::DEFAULT_NAMESPACE_NAME.to_string())
}
Err(_) => config::DEFAULT_NAMESPACE_NAME.to_string(),
};
let Some(file_name) = path.file_name() else {
continue;
};
let dst_dir = sessions_base.join(&namespace).join(kind);
std::fs::create_dir_all(&dst_dir)
.with_context(|| format!("create {}", dst_dir.display()))?;
let dst = dst_dir.join(file_name);
if dst.exists() {
anyhow::bail!(
"Refusing to migrate {}: destination {} already exists. \
Reconcile manually before starting.",
path.display(),
dst.display(),
);
}
std::fs::rename(&path, &dst)
.with_context(|| format!("rename {} -> {}", path.display(), dst.display()))?;
moved += 1;
}
let _ = std::fs::remove_dir(&kind_dir);
if moved > 0 {
tracing::info!(
"Migrated {} session file(s) from {} into namespaced layout",
moved,
kind_dir.display()
);
}
}
Ok(())
}
fn migrate_to_device_default_layout(sessions_base: &std::path::Path) -> anyhow::Result<()> {
use std::io::{BufRead, BufReader, Write};
let Ok(ns_entries) = std::fs::read_dir(sessions_base) else {
return Ok(());
};
let mut moved = 0usize;
let mut quarantined = 0usize;
for ns_entry in ns_entries.flatten() {
let ns_dir = ns_entry.path();
if !ns_dir.is_dir() {
continue;
}
for src_kind in ["api", "rpc"] {
let src_dir = ns_dir.join(src_kind);
if !src_dir.is_dir() {
continue;
}
let entries = match std::fs::read_dir(&src_dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Skipping {}: {e}", src_dir.display());
continue;
}
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() || path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
let Some(file_name) = path.file_name() else {
continue;
};
let is_voice = file_name
.to_str()
.map(|s| s.starts_with("voice-"))
.unwrap_or(false);
let (dst_dir, needs_meta_rewrite) = if is_voice {
(ns_dir.join("legacy-voice"), false)
} else {
(ns_dir.join("cross-device"), src_kind == "api")
};
std::fs::create_dir_all(&dst_dir)
.with_context(|| format!("create {}", dst_dir.display()))?;
let dst = dst_dir.join(file_name);
if dst.exists() {
anyhow::bail!(
"Refusing to migrate {}: destination {} already exists. \
Reconcile manually before starting.",
path.display(),
dst.display(),
);
}
if needs_meta_rewrite {
let tmp = dst.with_extension("partial");
let src_file = std::fs::File::open(&path)
.with_context(|| format!("open {}", path.display()))?;
let mut reader = BufReader::new(src_file);
let mut first_line = String::new();
reader
.read_line(&mut first_line)
.with_context(|| format!("read first line of {}", path.display()))?;
let rewritten = rewrite_meta_channel(first_line.trim(), "rpc")?;
let mut writer = std::fs::File::create(&tmp)
.with_context(|| format!("create {}", tmp.display()))?;
writeln!(writer, "{rewritten}")
.with_context(|| format!("write {}", tmp.display()))?;
std::io::copy(&mut reader, &mut writer)
.with_context(|| format!("copy body of {}", path.display()))?;
drop(writer);
std::fs::rename(&tmp, &dst).with_context(|| {
format!("rename {} -> {}", tmp.display(), dst.display())
})?;
std::fs::remove_file(&path)
.with_context(|| format!("remove {}", path.display()))?;
} else {
std::fs::rename(&path, &dst).with_context(|| {
format!("rename {} -> {}", path.display(), dst.display())
})?;
}
if is_voice {
quarantined += 1;
} else {
moved += 1;
}
}
let _ = std::fs::remove_dir(&src_dir);
}
}
if moved > 0 || quarantined > 0 {
tracing::info!(
"Session migration: {moved} cross-device session(s), \
{quarantined} legacy voice file(s) quarantined under legacy-voice/"
);
}
Ok(())
}
fn rewrite_meta_channel(line: &str, new_channel: &str) -> anyhow::Result<String> {
let mut value: serde_json::Value =
serde_json::from_str(line).with_context(|| format!("parse meta line: {line}"))?;
if let Some(meta) = value.get_mut("meta").and_then(|m| m.as_object_mut()) {
meta.insert(
"channel".to_string(),
serde_json::Value::String(new_channel.to_string()),
);
} else {
anyhow::bail!("first JSONL line missing `meta` object: {line}");
}
Ok(serde_json::to_string(&value)?)
}
fn migrate_per_channel_sessions(sessions_base: &std::path::Path) -> anyhow::Result<()> {
let target = sessions_base.join("channel");
let mut sources: Vec<std::path::PathBuf> = Vec::new();
for legacy in ["matrix", "discord"] {
let dir = sessions_base.join(legacy);
if dir.is_dir() {
sources.push(dir);
}
}
if sources.is_empty() {
return Ok(());
}
std::fs::create_dir_all(&target).with_context(|| format!("create {}", target.display()))?;
for src in sources {
let entries = match std::fs::read_dir(&src) {
Ok(e) => e,
Err(e) => {
tracing::warn!("Skipping {}: {e}", src.display());
continue;
}
};
let mut moved = 0;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(file_name) = path.file_name() else {
continue;
};
let dst = target.join(file_name);
if dst.exists() {
anyhow::bail!(
"Refusing to migrate {}: destination {} already exists. \
Reconcile manually before starting.",
path.display(),
dst.display(),
);
}
std::fs::rename(&path, &dst)
.with_context(|| format!("rename {} -> {}", path.display(), dst.display()))?;
moved += 1;
}
let _ = std::fs::remove_dir(&src);
tracing::info!(
"Migrated {} session file(s) from {} to {}",
moved,
src.display(),
target.display()
);
}
Ok(())
}
fn migrate_pre_namespace_layout(workspace_dir: &std::path::Path) -> anyhow::Result<()> {
let memory_root = workspace_dir.join("memory");
let has_top_memory_md = ["MEMORY.md", "memory.md"]
.iter()
.any(|f| workspace_dir.join(f).is_file());
if !memory_root.is_dir() && !has_top_memory_md {
return Ok(());
}
let default_root = memory_root.join(config::DEFAULT_NAMESPACE_NAME);
let kinds = ["daily", "weekly", "monthly", "yearly"];
let pre_dirs: Vec<_> = kinds
.iter()
.filter(|k| memory_root.join(k).is_dir())
.copied()
.collect();
let pre_memory_md = ["MEMORY.md", "memory.md"]
.iter()
.map(|f| workspace_dir.join(f))
.find(|p| p.is_file());
if pre_dirs.is_empty() && pre_memory_md.is_none() {
return Ok(());
}
if default_root.is_dir() && !pre_dirs.is_empty() {
anyhow::bail!(
"Both pre-namespace memory dirs ({}) and memory/{} exist — refuse to migrate. \
Reconcile manually before starting.",
pre_dirs.join(", "),
config::DEFAULT_NAMESPACE_NAME,
);
}
std::fs::create_dir_all(&default_root)
.with_context(|| format!("create {}", default_root.display()))?;
for kind in &pre_dirs {
let src = memory_root.join(kind);
let dst = default_root.join(kind);
std::fs::rename(&src, &dst)
.with_context(|| format!("rename {} -> {}", src.display(), dst.display()))?;
tracing::info!(
"Migrated memory subdir: {} -> {}",
src.display(),
dst.display()
);
}
if let Some(src) = pre_memory_md {
let dst = default_root.join("MEMORY.md");
std::fs::rename(&src, &dst)
.with_context(|| format!("rename {} -> {}", src.display(), dst.display()))?;
tracing::info!("Migrated MEMORY.md: {} -> {}", src.display(), dst.display());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn write_stub(path: &std::path::Path, body: &str) {
std::fs::create_dir_all(path.parent().unwrap()).unwrap();
std::fs::write(path, body).unwrap();
}
#[test]
fn migration_no_op_on_fresh_workspace() {
let td = tempfile::tempdir().unwrap();
migrate_pre_namespace_layout(td.path()).expect("clean migration");
assert!(!td.path().join("memory").exists());
}
#[test]
fn migration_moves_pre_namespace_layout() {
let td = tempfile::tempdir().unwrap();
let root = td.path();
write_stub(&root.join("memory/daily/2026-04-15.md"), "daily body");
write_stub(&root.join("memory/weekly/2026-W16.md"), "weekly body");
write_stub(&root.join("memory/monthly/2026-04.md"), "monthly body");
write_stub(&root.join("memory/yearly/2026.md"), "yearly body");
write_stub(&root.join("MEMORY.md"), "core memory");
migrate_pre_namespace_layout(root).expect("migration");
assert_eq!(
std::fs::read_to_string(root.join("memory/default/daily/2026-04-15.md")).unwrap(),
"daily body"
);
assert_eq!(
std::fs::read_to_string(root.join("memory/default/weekly/2026-W16.md")).unwrap(),
"weekly body"
);
assert_eq!(
std::fs::read_to_string(root.join("memory/default/monthly/2026-04.md")).unwrap(),
"monthly body"
);
assert_eq!(
std::fs::read_to_string(root.join("memory/default/yearly/2026.md")).unwrap(),
"yearly body"
);
assert_eq!(
std::fs::read_to_string(root.join("memory/default/MEMORY.md")).unwrap(),
"core memory"
);
assert!(!root.join("memory/daily").exists());
assert!(!root.join("MEMORY.md").exists());
}
#[test]
fn migration_accepts_lowercase_memory_md() {
let td = tempfile::tempdir().unwrap();
let root = td.path();
write_stub(&root.join("memory.md"), "lowercase memory");
migrate_pre_namespace_layout(root).expect("migration");
assert_eq!(
std::fs::read_to_string(root.join("memory/default/MEMORY.md")).unwrap(),
"lowercase memory"
);
}
#[test]
fn migration_is_idempotent() {
let td = tempfile::tempdir().unwrap();
let root = td.path();
write_stub(&root.join("memory/daily/2026-04-15.md"), "daily body");
migrate_pre_namespace_layout(root).expect("first migration");
migrate_pre_namespace_layout(root).expect("idempotent migration");
assert_eq!(
std::fs::read_to_string(root.join("memory/default/daily/2026-04-15.md")).unwrap(),
"daily body"
);
}
#[test]
fn migration_refuses_when_layouts_coexist() {
let td = tempfile::tempdir().unwrap();
let root = td.path();
write_stub(&root.join("memory/daily/2026-04-15.md"), "old");
write_stub(&root.join("memory/default/daily/2026-04-16.md"), "new");
let err = migrate_pre_namespace_layout(root).expect_err("error");
assert!(format!("{err:#}").contains("Reconcile manually"));
assert!(root.join("memory/daily/2026-04-15.md").exists());
assert!(root.join("memory/default/daily/2026-04-16.md").exists());
}
#[test]
fn session_migration_no_op_on_fresh_workspace() {
let td = tempfile::tempdir().unwrap();
migrate_per_channel_sessions(td.path()).expect("clean migration");
assert!(!td.path().join("channel").exists());
}
#[test]
fn session_migration_consolidates_matrix_and_discord() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
write_stub(&base.join("matrix/01H1.jsonl"), "matrix");
write_stub(&base.join("matrix/01H2.jsonl"), "matrix2");
write_stub(&base.join("discord/01H3.jsonl"), "discord");
migrate_per_channel_sessions(base).expect("migration");
for stem in ["01H1", "01H2", "01H3"] {
let path = base.join("channel").join(format!("{stem}.jsonl"));
assert!(path.exists(), "missing: {}", path.display());
}
assert!(
!base.join("matrix").exists(),
"matrix dir should be removed"
);
assert!(
!base.join("discord").exists(),
"discord dir should be removed"
);
}
#[test]
fn session_migration_is_idempotent() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
write_stub(&base.join("matrix/01H1.jsonl"), "matrix");
migrate_per_channel_sessions(base).expect("first");
migrate_per_channel_sessions(base).expect("second is a no-op");
assert!(base.join("channel/01H1.jsonl").exists());
}
#[test]
fn session_migration_refuses_on_collision() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
write_stub(&base.join("matrix/01H1.jsonl"), "matrix");
write_stub(&base.join("channel/01H1.jsonl"), "pre-existing");
let err = migrate_per_channel_sessions(base).expect_err("error");
assert!(
format!("{err:#}").contains("already exists"),
"got: {err:#}"
);
assert_eq!(
std::fs::read_to_string(base.join("matrix/01H1.jsonl")).unwrap(),
"matrix"
);
assert_eq!(
std::fs::read_to_string(base.join("channel/01H1.jsonl")).unwrap(),
"pre-existing"
);
}
fn meta_line(session_id: &str, channel: &str) -> String {
format!(
r#"{{"meta":{{"session_id":"{session_id}","room_id":"","thread_id":null,"channel":"{channel}","created_at":"2026-05-22T00:00:00Z","namespace":"default"}}}}"#
)
}
#[test]
fn device_default_migration_no_op_on_fresh_workspace() {
let td = tempfile::tempdir().unwrap();
migrate_to_device_default_layout(td.path()).expect("clean migration");
assert!(!td.path().join("default").exists());
}
#[test]
fn device_default_migration_moves_cross_device_and_quarantines_voice() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
let ns = base.join("default");
write_stub(
&ns.join("api").join("uuid-text.jsonl"),
&meta_line("uuid-text", "api"),
);
write_stub(
&ns.join("rpc").join("uuid-text-modern.jsonl"),
&meta_line("uuid-text-modern", "rpc"),
);
write_stub(
&ns.join("api").join("voice-deadbeef.jsonl"),
&meta_line("voice-deadbeef", "api"),
);
write_stub(
&ns.join("rpc").join("voice-feedface.jsonl"),
&meta_line("voice-feedface", "rpc"),
);
migrate_to_device_default_layout(base).expect("migration");
assert!(ns.join("cross-device/uuid-text.jsonl").exists());
assert!(ns.join("cross-device/uuid-text-modern.jsonl").exists());
assert!(ns.join("legacy-voice/voice-deadbeef.jsonl").exists());
assert!(ns.join("legacy-voice/voice-feedface.jsonl").exists());
assert!(!ns.join("api").exists(), "api dir should be removed");
assert!(!ns.join("rpc").exists(), "rpc dir should be removed");
}
#[test]
fn device_default_migration_rewrites_legacy_meta_channel() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
let ns = base.join("default");
write_stub(
&ns.join("api").join("uuid-from-api.jsonl"),
&meta_line("uuid-from-api", "api"),
);
write_stub(
&ns.join("rpc").join("uuid-from-rpc.jsonl"),
&meta_line("uuid-from-rpc", "rpc"),
);
write_stub(
&ns.join("api").join("voice-cafe.jsonl"),
&meta_line("voice-cafe", "api"),
);
migrate_to_device_default_layout(base).expect("migration");
let api_migrated =
std::fs::read_to_string(ns.join("cross-device/uuid-from-api.jsonl")).unwrap();
assert!(
api_migrated.contains(r#""channel":"rpc""#),
"legacy `api` meta must be rewritten to `rpc`: {api_migrated}"
);
assert!(
!api_migrated.contains(r#""channel":"api""#),
"legacy `api` meta should no longer appear: {api_migrated}"
);
let rpc_migrated =
std::fs::read_to_string(ns.join("cross-device/uuid-from-rpc.jsonl")).unwrap();
assert!(
rpc_migrated.contains(r#""channel":"rpc""#),
"post-PR1 `rpc` meta stays as-is: {rpc_migrated}"
);
let voice_quarantined =
std::fs::read_to_string(ns.join("legacy-voice/voice-cafe.jsonl")).unwrap();
assert!(
voice_quarantined.contains(r#""channel":"api""#),
"voice meta is preserved verbatim (no rewrite): {voice_quarantined}"
);
}
#[test]
fn device_default_migration_is_idempotent() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
let ns = base.join("default");
write_stub(
&ns.join("api").join("uuid-text.jsonl"),
&meta_line("uuid-text", "api"),
);
migrate_to_device_default_layout(base).expect("first");
migrate_to_device_default_layout(base).expect("second is a no-op");
assert!(ns.join("cross-device/uuid-text.jsonl").exists());
}
#[test]
fn device_default_migration_refuses_on_collision() {
let td = tempfile::tempdir().unwrap();
let base = td.path();
let ns = base.join("default");
write_stub(
&ns.join("api").join("uuid-text.jsonl"),
&meta_line("uuid-text", "api"),
);
write_stub(
&ns.join("cross-device").join("uuid-text.jsonl"),
"pre-existing",
);
let err = migrate_to_device_default_layout(base).expect_err("collision");
assert!(
format!("{err:#}").contains("already exists"),
"got: {err:#}"
);
assert!(ns.join("api/uuid-text.jsonl").exists());
assert_eq!(
std::fs::read_to_string(ns.join("cross-device/uuid-text.jsonl")).unwrap(),
"pre-existing"
);
}
}