use std::collections::HashMap;
use std::fs;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use serde::Deserialize;
use tracing::{debug, info, warn};
pub type SessionsIndex = HashMap<String, SessionDescriptor>;
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionDescriptor {
pub session_id: String,
#[serde(default)]
pub updated_at: Option<u64>,
#[serde(default)]
pub chat_type: Option<String>,
#[serde(default)]
pub last_channel: Option<String>,
#[serde(default)]
pub compaction_count: Option<u32>,
#[serde(default)]
pub session_file: Option<String>,
#[serde(default)]
pub origin: Option<SessionOrigin>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionOrigin {
#[serde(default)]
pub provider: Option<String>,
#[serde(default)]
pub chat_type: Option<String>,
#[serde(default)]
pub label: Option<String>,
#[serde(default)]
pub from: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionEvent {
#[serde(rename = "type")]
pub event_type: String,
#[serde(default)]
pub id: Option<String>,
#[serde(default)]
pub timestamp: Option<String>,
#[serde(default)]
pub message: Option<MessagePayload>,
#[serde(default)]
pub model_id: Option<String>,
#[serde(default)]
pub custom_type: Option<String>,
#[serde(default)]
pub data: Option<serde_json::Value>,
#[serde(default)]
pub cwd: Option<String>,
#[serde(default)]
pub version: Option<u32>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct MessagePayload {
pub role: String,
#[serde(default)]
pub content: Option<serde_json::Value>,
#[serde(default)]
pub model: Option<String>,
#[serde(default)]
pub usage: Option<serde_json::Value>,
}
#[derive(Debug, Default)]
pub struct OpenClawScanResult {
pub agent_ids: Vec<String>,
pub total_sessions: usize,
pub total_jsonl_files: usize,
pub total_memories: usize,
pub total_memory_md_files: usize,
pub total_memory_dbs: usize,
pub total_workspaces: usize,
pub total_skills: usize,
pub sessions_per_agent: HashMap<String, usize>,
pub has_config: bool,
pub total_cron_jobs: usize,
}
#[derive(Debug, Clone)]
pub struct ConvertedMessage {
pub role: String,
pub content: String,
pub model: Option<String>,
pub timestamp: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConvertedMemory {
pub key: String,
pub value: String,
pub agent_id: String,
}
#[derive(Debug)]
pub struct ConvertedSession {
pub session_key: String,
pub openclaw_session_id: String,
pub agent_id: String,
pub messages: Vec<ConvertedMessage>,
pub updated_at: Option<u64>,
}
#[derive(Debug, Default)]
pub struct ImportStats {
pub sessions: usize,
pub messages: usize,
pub memories: usize,
pub workspace_files: usize,
pub skills: usize,
pub aliases: usize,
pub errors: usize,
}
pub fn scan_openclaw(dir: &Path) -> Result<OpenClawScanResult> {
let mut result = OpenClawScanResult::default();
result.has_config = dir.join("openclaw.json").is_file();
let agents_dir = dir.join("agents");
if !agents_dir.is_dir() {
info!(path = %dir.display(), "no agents/ directory found");
return Ok(result);
}
let entries = fs::read_dir(&agents_dir)
.with_context(|| format!("read agents dir: {}", agents_dir.display()))?;
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let agent_id = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
if agent_id.is_empty() {
continue;
}
result.agent_ids.push(agent_id.clone());
let sessions_dir = path.join("sessions");
if !sessions_dir.is_dir() {
continue;
}
let index_path = sessions_dir.join("sessions.json");
if index_path.is_file() {
match read_sessions_index(&index_path) {
Ok(index) => {
let count = index.len();
result.total_sessions += count;
result.sessions_per_agent.insert(agent_id.clone(), count);
}
Err(e) => {
warn!(
agent = %agent_id,
error = %e,
"failed to parse sessions.json"
);
}
}
}
let jsonl_entries = fs::read_dir(&sessions_dir);
if let Ok(entries) = jsonl_entries {
for file_entry in entries.flatten() {
let fpath = file_entry.path();
if fpath.extension().and_then(|e| e.to_str()) == Some("jsonl") {
result.total_jsonl_files += 1;
if let Ok(memories) = read_custom_memories(&fpath) {
result.total_memories += memories.len();
}
}
}
}
}
for ws_entry in fs::read_dir(dir)?.flatten() {
let ws_path = ws_entry.path();
let name = ws_path.file_name().unwrap_or_default().to_string_lossy();
if ws_path.is_dir() && name.starts_with("workspace") {
result.total_workspaces += 1;
if ws_path.join("MEMORY.md").is_file() {
result.total_memory_md_files += 1;
}
let mem_dir = ws_path.join("memory");
if mem_dir.is_dir() {
if let Ok(md_entries) = fs::read_dir(&mem_dir) {
for md_entry in md_entries.flatten() {
if md_entry.path().extension().and_then(|e| e.to_str()) == Some("md") {
result.total_memory_md_files += 1;
}
}
}
}
let skills_dir = ws_path.join("skills");
if skills_dir.is_dir() {
if let Ok(skill_entries) = fs::read_dir(&skills_dir) {
result.total_skills += skill_entries
.flatten()
.filter(|e| e.path().is_dir())
.count();
}
}
}
}
let memory_dir = dir.join("memory");
if memory_dir.is_dir() {
if let Ok(mem_entries) = fs::read_dir(&memory_dir) {
result.total_memory_dbs += mem_entries
.flatten()
.filter(|e| {
let p = e.path();
let ext = p.extension().and_then(|ext| ext.to_str()).unwrap_or("");
ext == "sqlite" || ext == "db"
})
.count();
}
}
for ws_entry in fs::read_dir(dir)?.flatten() {
let ws_path = ws_entry.path();
let name = ws_path.file_name().unwrap_or_default().to_string_lossy();
if ws_path.is_dir() && name.starts_with("workspace") {
if ws_path.join("memory").join("brain.db").is_file() {
result.total_memory_dbs += 1;
}
}
}
let cron_path = dir.join("cron/jobs.json");
if cron_path.is_file() {
if let Ok(data) = fs::read_to_string(&cron_path) {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(&data) {
result.total_cron_jobs = val["jobs"]
.as_array()
.map(|a| a.len())
.unwrap_or(0);
}
}
}
debug!(
agents = result.agent_ids.len(),
sessions = result.total_sessions,
jsonl_files = result.total_jsonl_files,
memories = result.total_memories,
memory_md = result.total_memory_md_files,
memory_dbs = result.total_memory_dbs,
workspaces = result.total_workspaces,
skills = result.total_skills,
cron_jobs = result.total_cron_jobs,
"OpenClaw scan complete"
);
Ok(result)
}
pub fn read_sessions_index(path: &Path) -> Result<SessionsIndex> {
let data = fs::read_to_string(path)
.with_context(|| format!("read sessions.json: {}", path.display()))?;
let index: SessionsIndex = serde_json::from_str(&data)
.with_context(|| format!("parse sessions.json: {}", path.display()))?;
debug!(path = %path.display(), sessions = index.len(), "parsed sessions index");
Ok(index)
}
pub fn read_session_messages(jsonl_path: &Path) -> Result<Vec<ConvertedMessage>> {
let file = fs::File::open(jsonl_path)
.with_context(|| format!("open JSONL: {}", jsonl_path.display()))?;
let reader = BufReader::new(file);
let mut messages = Vec::new();
for (line_num, line_result) in reader.lines().enumerate() {
let line = match line_result {
Ok(l) => l,
Err(e) => {
warn!(
path = %jsonl_path.display(),
line = line_num + 1,
error = %e,
"failed to read line"
);
continue;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let event: SessionEvent = match serde_json::from_str(trimmed) {
Ok(e) => e,
Err(e) => {
debug!(
path = %jsonl_path.display(),
line = line_num + 1,
error = %e,
"skipping unparseable JSONL line"
);
continue;
}
};
if event.event_type != "message" {
continue;
}
if let Some(msg) = event.message {
let content = extract_text_content(&msg.content);
if content.is_empty() {
continue;
}
messages.push(ConvertedMessage {
role: msg.role,
content,
model: msg.model,
timestamp: event.timestamp,
});
}
}
debug!(
path = %jsonl_path.display(),
messages = messages.len(),
"parsed session JSONL"
);
Ok(messages)
}
fn extract_text_content(content: &Option<serde_json::Value>) -> String {
match content {
None => String::new(),
Some(serde_json::Value::String(s)) => s.clone(),
Some(serde_json::Value::Array(blocks)) => {
let mut parts = Vec::new();
for block in blocks {
if let Some(obj) = block.as_object() {
let block_type = obj.get("type").and_then(|t| t.as_str()).unwrap_or("");
match block_type {
"text" => {
if let Some(text) = obj.get("text").and_then(|t| t.as_str()) {
parts.push(text.to_owned());
}
}
"thinking" => {
if let Some(text) = obj.get("thinking").and_then(|t| t.as_str()) {
parts.push(format!("[thinking] {text}"));
}
}
_ => {}
}
}
}
parts.join("\n")
}
Some(_) => String::new(),
}
}
pub fn read_custom_memories(jsonl_path: &Path) -> Result<Vec<ConvertedMemory>> {
let file = fs::File::open(jsonl_path)
.with_context(|| format!("open JSONL for memories: {}", jsonl_path.display()))?;
let reader = BufReader::new(file);
let mut memories = Vec::new();
for line_result in reader.lines() {
let line = match line_result {
Ok(l) => l,
Err(_) => continue,
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let event: SessionEvent = match serde_json::from_str(trimmed) {
Ok(e) => e,
Err(_) => continue,
};
if event.event_type != "custom" {
continue;
}
if event.custom_type.as_deref() != Some("memory_put") {
continue;
}
if let Some(data) = &event.data {
let key = data
.get("key")
.and_then(|k| k.as_str())
.unwrap_or("")
.to_owned();
let value = data
.get("value")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_owned();
if !key.is_empty() && !value.is_empty() {
memories.push(ConvertedMemory {
key,
value,
agent_id: String::new(), });
}
}
}
Ok(memories)
}
pub fn resolve_jsonl_path(sessions_dir: &Path, descriptor: &SessionDescriptor) -> PathBuf {
if let Some(ref file_path) = descriptor.session_file {
let p = PathBuf::from(file_path);
if p.is_file() {
return p;
}
if let Some(fname) = p.file_name() {
let local = sessions_dir.join(fname);
if local.is_file() {
return local;
}
}
}
sessions_dir.join(format!("{}.jsonl", descriptor.session_id))
}
pub fn make_rsclaw_session_key(openclaw_key: &str, agent_id: &str) -> String {
if openclaw_key.starts_with("agent:") {
openclaw_key.to_owned()
} else {
format!("agent:{agent_id}:{openclaw_key}")
}
}
pub fn generate_session_aliases(
sessions: &[ConvertedSession],
channel_remap: &std::collections::HashMap<String, String>,
) -> Vec<(String, String)> {
let mut aliases = Vec::new();
for session in sessions {
let stored_key = &session.session_key;
let parts: Vec<&str> = stored_key.split(':').collect();
if parts.len() < 3 || parts[0] != "agent" {
continue;
}
if parts.len() == 6 && parts[4] == "direct" {
let agent_id = parts[1];
let channel = parts[2];
let _account_id = parts[3];
let peer_id = parts[5];
let no_account = format!("agent:{agent_id}:{channel}:direct:{peer_id}");
if no_account != *stored_key {
aliases.push((no_account, stored_key.clone()));
}
if let Some(new_channel) = channel_remap.get(channel) {
let remapped = format!("agent:{agent_id}:{new_channel}:direct:{peer_id}");
aliases.push((remapped, stored_key.clone()));
let remapped_with_acc = format!("agent:{agent_id}:{new_channel}:{_account_id}:direct:{peer_id}");
aliases.push((remapped_with_acc, stored_key.clone()));
}
}
if parts.len() == 5 && parts[3] == "group" {
let agent_id = parts[1];
let channel = parts[2];
let group_id = parts[4];
if let Some(new_channel) = channel_remap.get(channel) {
let remapped = format!("agent:{agent_id}:{new_channel}:group:{group_id}");
aliases.push((remapped, stored_key.clone()));
}
}
}
aliases
}
pub fn read_agent_sessions(agent_dir: &Path, agent_id: &str) -> Result<Vec<ConvertedSession>> {
let sessions_dir = agent_dir.join("sessions");
let index_path = sessions_dir.join("sessions.json");
if !index_path.is_file() {
debug!(agent = %agent_id, "no sessions.json found");
return Ok(Vec::new());
}
let index = read_sessions_index(&index_path)?;
let mut converted = Vec::new();
for (session_key, descriptor) in &index {
let jsonl_path = resolve_jsonl_path(&sessions_dir, descriptor);
if !jsonl_path.is_file() {
debug!(
agent = %agent_id,
session = %descriptor.session_id,
path = %jsonl_path.display(),
"JSONL file not found, skipping"
);
continue;
}
match read_session_messages(&jsonl_path) {
Ok(messages) => {
if messages.is_empty() {
debug!(
agent = %agent_id,
session = %descriptor.session_id,
"session has no messages, skipping"
);
continue;
}
let rsclaw_key = make_rsclaw_session_key(session_key, agent_id);
converted.push(ConvertedSession {
session_key: rsclaw_key,
openclaw_session_id: descriptor.session_id.clone(),
agent_id: agent_id.to_owned(),
messages,
updated_at: descriptor.updated_at,
});
}
Err(e) => {
warn!(
agent = %agent_id,
session = %descriptor.session_id,
error = %e,
"failed to read session JSONL"
);
}
}
}
info!(
agent = %agent_id,
sessions = converted.len(),
"read agent sessions"
);
Ok(converted)
}
pub fn read_agent_memories(agent_dir: &Path, agent_id: &str) -> Result<Vec<ConvertedMemory>> {
let sessions_dir = agent_dir.join("sessions");
if !sessions_dir.is_dir() {
return Ok(Vec::new());
}
let mut all_memories = Vec::new();
let entries = fs::read_dir(&sessions_dir)?;
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("jsonl") {
continue;
}
match read_custom_memories(&path) {
Ok(mut memories) => {
for m in &mut memories {
m.agent_id = agent_id.to_owned();
}
all_memories.extend(memories);
}
Err(e) => {
debug!(
path = %path.display(),
error = %e,
"failed to read memories from JSONL"
);
}
}
}
info!(
agent = %agent_id,
memories = all_memories.len(),
"read agent memories"
);
Ok(all_memories)
}
pub fn import_sessions_to_redb(
openclaw_dir: &Path,
store: &crate::store::redb_store::RedbStore,
) -> Result<ImportStats> {
let mut stats = ImportStats::default();
let mut all_sessions: Vec<ConvertedSession> = Vec::new();
let agents_dir = openclaw_dir.join("agents");
if !agents_dir.is_dir() {
info!("no agents/ directory in OpenClaw dir");
return Ok(stats);
}
let entries = fs::read_dir(&agents_dir)?;
for entry in entries.flatten() {
let agent_dir = entry.path();
if !agent_dir.is_dir() {
continue;
}
let agent_id = agent_dir
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
match read_agent_sessions(&agent_dir, &agent_id) {
Ok(sessions) => {
for session in &sessions {
for msg in &session.messages {
let json_msg = serde_json::json!({
"role": msg.role,
"content": msg.content,
"model": msg.model,
"timestamp": msg.timestamp,
"source": "openclaw",
});
match store.append_message(&session.session_key, &json_msg) {
Ok(_) => stats.messages += 1,
Err(e) => {
warn!(
session = %session.session_key,
error = %e,
"failed to import message"
);
stats.errors += 1;
}
}
}
stats.sessions += 1;
}
all_sessions.extend(sessions);
}
Err(e) => {
warn!(agent = %agent_id, error = %e, "failed to read agent sessions");
stats.errors += 1;
}
}
match read_agent_memories(&agent_dir, &agent_id) {
Ok(memories) => {
stats.memories += memories.len();
}
Err(e) => {
warn!(agent = %agent_id, error = %e, "failed to read agent memories");
}
}
}
let mut channel_remap = std::collections::HashMap::new();
channel_remap.insert("openclaw-weixin".to_owned(), "wechat".to_owned());
let aliases = generate_session_aliases(&all_sessions, &channel_remap);
if !aliases.is_empty() {
let alias_refs: Vec<(&str, &str)> = aliases
.iter()
.map(|(a, c)| (a.as_str(), c.as_str()))
.collect();
store.put_session_aliases(&alias_refs)?;
info!(count = aliases.len(), "session aliases written");
stats.aliases = aliases.len();
}
info!(
sessions = stats.sessions,
messages = stats.messages,
memories = stats.memories,
aliases = stats.aliases,
errors = stats.errors,
"OpenClaw JSONL import complete"
);
Ok(stats)
}
#[derive(Debug, Default)]
pub struct MemoryImportStats {
pub total: usize,
pub imported: usize,
pub errors: usize,
}
pub async fn import_memories_to_store(
openclaw_dir: &Path,
mem_arc: &std::sync::Arc<tokio::sync::Mutex<crate::agent::MemoryStore>>,
) -> Result<MemoryImportStats> {
use crate::agent::memory::MemoryDoc;
use rayon::prelude::*;
let agents_dir = openclaw_dir.join("agents");
if !agents_dir.is_dir() {
info!("openclaw memory import: no agents/ directory");
return Ok(MemoryImportStats::default());
}
let mut all_memories: Vec<ConvertedMemory> = Vec::new();
for entry in fs::read_dir(&agents_dir)?.flatten() {
let agent_dir = entry.path();
if !agent_dir.is_dir() {
continue;
}
let agent_id = agent_dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("")
.to_owned();
if agent_id.is_empty() {
continue;
}
if let Ok(mems) = read_agent_memories(&agent_dir, &agent_id) {
all_memories.extend(mems);
}
}
let total = all_memories.len();
if total == 0 {
return Ok(MemoryImportStats::default());
}
info!(total, "openclaw memory import: parallel embed starting");
let started = std::time::Instant::now();
const BATCH: usize = 100;
let mut imported = 0usize;
let mut errors = 0usize;
let show_progress = total > BATCH;
for chunk in all_memories.chunks(BATCH) {
let (primary, secondary) = {
let mem = mem_arc.lock().await;
mem.embedders_for_dual_write()
};
let prepared: Vec<(MemoryDoc, Vec<f32>, Option<Vec<f32>>)> = chunk
.par_iter()
.map(|cm| {
let doc = MemoryDoc {
id: format!("oc:{}:{}", cm.agent_id, cm.key),
scope: cm.agent_id.clone(),
kind: "openclaw_memory".to_owned(),
text: cm.value.clone(),
vector: Vec::new(),
created_at: 0,
accessed_at: 0,
access_count: 0,
importance: 0.5,
tier: Default::default(),
abstract_text: None,
overview_text: None,
tags: vec!["openclaw_import".to_owned()],
pinned: false,
};
let pv = primary.embed(&doc.text);
let sv = secondary.as_ref().map(|e| e.embed(&doc.text));
(doc, pv, sv)
})
.collect();
let mut mem = mem_arc.lock().await;
for (doc, pv, sv) in prepared {
match mem.add_pre_embedded(doc, pv, sv).await {
Ok(()) => imported += 1,
Err(e) => {
warn!(error = %e, "openclaw memory import: add_pre_embedded failed");
errors += 1;
}
}
}
info!(
imported,
total,
errors,
elapsed_secs = started.elapsed().as_secs(),
"openclaw memory import: batch complete"
);
if show_progress {
eprintln!(" · embedded {imported}/{total}");
}
}
info!(
total,
imported,
errors,
elapsed_secs = started.elapsed().as_secs(),
"openclaw memory import: done"
);
Ok(MemoryImportStats {
total,
imported,
errors,
})
}
pub async fn import_workspace_memory(
openclaw_dir: &Path,
rsclaw_dir: &Path,
mem_arc: &std::sync::Arc<tokio::sync::Mutex<crate::agent::MemoryStore>>,
) -> Result<MemoryImportStats> {
use crate::agent::memory::MemoryDoc;
use rayon::prelude::*;
let mut sources: Vec<(String, String, String)> = Vec::new(); let mut copy_jobs: Vec<(PathBuf, PathBuf)> = Vec::new();
for entry in fs::read_dir(openclaw_dir)?.flatten() {
let p = entry.path();
if !p.is_dir() {
continue;
}
let Some(name) = p.file_name().and_then(|n| n.to_str()) else {
continue;
};
if !name.starts_with("workspace") {
continue;
}
let scope = name.strip_prefix("workspace-").unwrap_or("global").to_owned();
let dst_workspace = rsclaw_dir.join(name);
let core_src = p.join("MEMORY.md");
if core_src.is_file()
&& let Ok(text) = fs::read_to_string(&core_src)
&& !text.trim().is_empty()
{
sources.push((scope.clone(), "openclaw_memory_core".to_owned(), text));
copy_jobs.push((core_src, dst_workspace.join("MEMORY.md")));
}
let daily_dir = p.join("memory");
if daily_dir.is_dir() {
for f in fs::read_dir(&daily_dir)?.flatten() {
let fp = f.path();
if fp.extension().and_then(|e| e.to_str()) != Some("md") {
continue;
}
let Ok(text) = fs::read_to_string(&fp) else {
continue;
};
if text.trim().is_empty() {
continue;
}
sources.push((scope.clone(), "openclaw_memory_daily".to_owned(), text.clone()));
let fname = fp
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("daily.md")
.to_owned();
copy_jobs.push((fp.clone(), dst_workspace.join("memory").join(fname)));
}
}
}
let total = sources.len();
if total == 0 {
info!("openclaw workspace memory import: nothing to import");
return Ok(MemoryImportStats::default());
}
info!(total, "openclaw workspace memory import: copying + embedding");
let started = std::time::Instant::now();
let mut copy_errors = 0usize;
for (src, dst) in copy_jobs {
if let Some(parent) = dst.parent() {
let _ = fs::create_dir_all(parent);
}
if dst.exists() {
continue; }
if let Err(e) = fs::copy(&src, &dst) {
warn!(error = %e, src = %src.display(), dst = %dst.display(), "workspace memory copy failed");
copy_errors += 1;
}
}
const BATCH: usize = 50;
let mut imported = 0usize;
let mut embed_errors = 0usize;
let show_progress = sources.len() > BATCH;
for chunk in sources.chunks(BATCH) {
let (primary, secondary) = {
let mem = mem_arc.lock().await;
mem.embedders_for_dual_write()
};
let prepared: Vec<(MemoryDoc, Vec<f32>, Option<Vec<f32>>)> = chunk
.par_iter()
.enumerate()
.map(|(i, (scope, kind, text))| {
let id = format!(
"oc-ws:{}:{}:{}",
scope,
kind,
sha256_short(text)
);
let doc = MemoryDoc {
id,
scope: scope.clone(),
kind: kind.clone(),
text: text.clone(),
vector: Vec::new(),
created_at: 0,
accessed_at: 0,
access_count: 0,
importance: if kind == "openclaw_memory_core" { 0.8 } else { 0.5 },
tier: Default::default(),
abstract_text: None,
overview_text: None,
tags: vec!["openclaw_import".to_owned()],
pinned: kind == "openclaw_memory_core",
};
let _ = i;
let pv = primary.embed(&doc.text);
let sv = secondary.as_ref().map(|e| e.embed(&doc.text));
(doc, pv, sv)
})
.collect();
let mut mem = mem_arc.lock().await;
for (doc, pv, sv) in prepared {
match mem.add_pre_embedded(doc, pv, sv).await {
Ok(()) => imported += 1,
Err(e) => {
warn!(error = %e, "workspace memory ingest: add_pre_embedded failed");
embed_errors += 1;
}
}
}
if show_progress {
eprintln!(" · embedded {imported}/{total}");
}
}
info!(
total,
imported,
copy_errors,
embed_errors,
elapsed_secs = started.elapsed().as_secs(),
"openclaw workspace memory import: done"
);
Ok(MemoryImportStats {
total,
imported,
errors: copy_errors + embed_errors,
})
}
fn sha256_short(s: &str) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(s.as_bytes());
hex::encode(&h.finalize()[..4])
}
const WORKSPACE_FILES: &[&str] = &[
"SOUL.md",
"IDENTITY.md",
"AGENT.md",
"AGENTS.md",
"SYSTEM.md",
"USER.md",
"MEMORY.md",
];
const REBRAND_FILES: &[&str] = &[
"SOUL.md",
"IDENTITY.md",
"AGENT.md",
"AGENTS.md",
"SYSTEM.md",
];
fn rebrand_content(input: &str) -> String {
let step1 = input.replace("OpenClaw", "RsClaw");
let needle = "openclaw";
let mut out = String::with_capacity(step1.len());
let bytes = step1.as_bytes();
let n = needle.len();
let mut i = 0;
while i < bytes.len() {
if i + n <= bytes.len() && bytes[i..i + n].eq_ignore_ascii_case(needle.as_bytes()) {
out.push_str("rsclaw");
i += n;
} else {
let ch_start = i;
let ch_len = match bytes[i] {
b if b < 0x80 => 1,
b if b < 0xC0 => 1, b if b < 0xE0 => 2,
b if b < 0xF0 => 3,
_ => 4,
};
let end = (ch_start + ch_len).min(bytes.len());
out.push_str(&step1[ch_start..end]);
i = end;
}
}
out.replace("龙虾", "螃蟹").replace("🦞", "🦀")
}
pub fn copy_workspace_files(
src_workspace: &Path,
dst_workspace: &Path,
) -> Result<usize> {
if !src_workspace.is_dir() {
return Ok(0);
}
std::fs::create_dir_all(dst_workspace)?;
let mut count = 0;
let has_agents = src_workspace.join("AGENTS.md").is_file();
let has_agent = src_workspace.join("AGENT.md").is_file();
let promote_agent = has_agent && !has_agents;
for filename in WORKSPACE_FILES {
let src = src_workspace.join(filename);
if !src.is_file() {
continue;
}
let targets: &[&str] = if *filename == "AGENT.md" && promote_agent {
&["AGENT.md", "AGENTS.md"]
} else {
std::slice::from_ref(filename)
};
for &target_name in targets {
let dst = dst_workspace.join(target_name);
if dst.exists() {
debug!(file = %target_name, "skipped (already exists)");
continue;
}
let needs_rebrand =
REBRAND_FILES.contains(filename) || REBRAND_FILES.contains(&target_name);
if needs_rebrand {
let raw = std::fs::read_to_string(&src)?;
let rebranded = rebrand_content(&raw);
std::fs::write(&dst, rebranded)?;
debug!(file = %target_name, src = %filename, "copied workspace file (rebranded)");
} else {
std::fs::copy(&src, &dst)?;
debug!(file = %target_name, src = %filename, "copied workspace file");
}
count += 1;
}
}
Ok(count)
}
#[cfg(test)]
mod rebrand_tests {
use super::rebrand_content;
#[test]
fn camelcase_preserved() {
assert_eq!(rebrand_content("OpenClaw runs"), "RsClaw runs");
}
#[test]
fn lowercase_swapped() {
assert_eq!(rebrand_content("openclaw runs"), "rsclaw runs");
}
#[test]
fn uppercase_swapped() {
assert_eq!(rebrand_content("OPENCLAW"), "rsclaw");
}
#[test]
fn mixed_in_one_doc() {
let src = "OpenClaw is built on top of openclaw — see openClaw.json";
let out = rebrand_content(src);
assert_eq!(out, "RsClaw is built on top of rsclaw — see rsclaw.json");
}
#[test]
fn lobster_to_crab() {
assert_eq!(rebrand_content("我是 🦞 龙虾"), "我是 🦀 螃蟹");
}
#[test]
fn untouched_when_absent() {
let s = "no branding here, just regular text 中文 also fine";
assert_eq!(rebrand_content(s), s);
}
#[test]
fn does_not_corrupt_utf8() {
let src = "前置中文OpenClaw后置🦞结尾";
assert_eq!(rebrand_content(src), "前置中文RsClaw后置🦀结尾");
}
}
#[cfg(test)]
mod copy_workspace_tests {
use super::copy_workspace_files;
use std::fs;
fn write(p: &std::path::Path, body: &str) {
fs::create_dir_all(p.parent().unwrap()).unwrap();
fs::write(p, body).unwrap();
}
#[test]
fn promotes_singular_agent_to_canonical_when_only_one_present() {
let tmp = tempfile::tempdir().unwrap();
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
write(&src.join("AGENT.md"), "rules from OpenClaw user");
let n = copy_workspace_files(&src, &dst).unwrap();
assert_eq!(n, 2);
assert!(dst.join("AGENT.md").is_file());
assert!(dst.join("AGENTS.md").is_file());
let agents = fs::read_to_string(dst.join("AGENTS.md")).unwrap();
assert!(agents.contains("RsClaw"), "expected rebrand: {agents}");
assert!(!agents.contains("OpenClaw"));
}
#[test]
fn does_not_promote_when_canonical_already_exists() {
let tmp = tempfile::tempdir().unwrap();
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
write(&src.join("AGENT.md"), "user singular content");
write(&src.join("AGENTS.md"), "canonical content");
copy_workspace_files(&src, &dst).unwrap();
assert_eq!(
fs::read_to_string(dst.join("AGENTS.md")).unwrap(),
"canonical content"
);
assert_eq!(
fs::read_to_string(dst.join("AGENT.md")).unwrap(),
"user singular content"
);
}
#[test]
fn skips_targets_that_already_exist_at_destination() {
let tmp = tempfile::tempdir().unwrap();
let src = tmp.path().join("src");
let dst = tmp.path().join("dst");
write(&src.join("AGENT.md"), "incoming");
write(&dst.join("AGENTS.md"), "pre-existing canonical");
copy_workspace_files(&src, &dst).unwrap();
assert_eq!(
fs::read_to_string(dst.join("AGENTS.md")).unwrap(),
"pre-existing canonical"
);
}
}
pub fn convert_heartbeat(src_workspace: &Path, dst_workspace: &Path) -> Result<bool> {
let src = src_workspace.join("HEARTBEAT.md");
if !src.is_file() {
return Ok(false);
}
let dst = dst_workspace.join("HEARTBEAT.md");
if dst.exists() {
debug!("HEARTBEAT.md: skipped (already exists)");
return Ok(false);
}
let raw = fs::read_to_string(&src)?;
if raw.starts_with("---") {
std::fs::create_dir_all(dst_workspace)?;
fs::copy(&src, &dst)?;
debug!("HEARTBEAT.md: copied as-is (already has frontmatter)");
return Ok(true);
}
let every = extract_every(&raw).unwrap_or("30m".to_owned());
let active_hours = extract_active_hours(&raw);
let timezone = extract_timezone(&raw).unwrap_or("Asia/Shanghai".to_owned());
let mut frontmatter = format!("---\nevery: {every}\n");
if let Some(ref hours) = active_hours {
frontmatter.push_str(&format!("active_hours: {hours}\n"));
}
frontmatter.push_str(&format!("timezone: {timezone}\n---\n"));
std::fs::create_dir_all(dst_workspace)?;
fs::write(&dst, format!("{frontmatter}{raw}"))?;
debug!("HEARTBEAT.md: converted with every={every}, active_hours={active_hours:?}");
Ok(true)
}
fn extract_every(text: &str) -> Option<String> {
let re = regex::Regex::new(
r"(?:每\s*(\d+)\s*(?:分钟|分|min)|every\s+(\d+)\s*(?:minutes?|mins?|m)\b|每\s*(\d+)\s*(?:小时|时|hour)|every\s+(\d+)\s*(?:hours?|h)\b|每\s*(\d+)\s*(?:秒|sec)|every\s+(\d+)\s*(?:seconds?|secs?|s)\b)"
).ok()?;
let caps = re.captures(text)?;
if let Some(m) = caps.get(1).or(caps.get(2)) {
return Some(format!("{}m", m.as_str()));
}
if let Some(m) = caps.get(3).or(caps.get(4)) {
return Some(format!("{}h", m.as_str()));
}
if let Some(m) = caps.get(5).or(caps.get(6)) {
return Some(format!("{}s", m.as_str()));
}
None
}
fn extract_active_hours(text: &str) -> Option<String> {
let re = regex::Regex::new(r"(\d{1,2}:\d{2})\s*[-–—]\s*(\d{1,2}:\d{2})").ok()?;
let caps = re.captures(text)?;
Some(format!("{}-{}", &caps[1], &caps[2]))
}
fn extract_timezone(text: &str) -> Option<String> {
let re = regex::Regex::new(r"(?:timezone|时区)[::]\s*([A-Za-z_/]+)").ok()?;
let caps = re.captures(text)?;
Some(caps[1].to_owned())
}
pub fn copy_skills(src_workspace: &Path, dst_workspace: &Path) -> Result<usize> {
let src_skills = src_workspace.join("skills");
if !src_skills.is_dir() {
return Ok(0);
}
let dst_skills = dst_workspace.join("skills");
std::fs::create_dir_all(&dst_skills)?;
let mut count = 0;
for entry in fs::read_dir(&src_skills)?.flatten() {
let src_path = entry.path();
if !src_path.is_dir() {
continue;
}
let name = src_path.file_name().unwrap_or_default();
let dst_path = dst_skills.join(name);
if !dst_path.exists() {
copy_dir_recursive(&src_path, &dst_path)?;
count += 1;
debug!(skill = ?name, "copied skill");
let skill_md = dst_path.join("SKILL.md");
if skill_md.is_file()
&& let Err(e) = sanitize_skill_md_frontmatter(&skill_md)
{
warn!(error = %e, file = %skill_md.display(), "failed to sanitize SKILL.md frontmatter");
}
}
}
Ok(count)
}
fn sanitize_skill_md_frontmatter(path: &Path) -> Result<bool> {
let content = fs::read_to_string(path)?;
let Some(rest) = content.strip_prefix("---\n") else {
return Ok(false);
};
let Some(end_offset) = rest.find("\n---\n") else {
return Ok(false);
};
let frontmatter = &rest[..end_offset];
let after = &rest[end_offset..];
let mut changed = false;
let mut new_fm = String::with_capacity(frontmatter.len());
for line in frontmatter.lines() {
if let Some(colon_tick) = line.find(": `") {
let key_part = &line[..colon_tick];
let after_open = &line[colon_tick + 3..];
if let Some(close_tick_off) = after_open.rfind('`') {
let value = &after_open[..close_tick_off];
let trailing = &after_open[close_tick_off + 1..]; if !value.contains('`') && trailing.trim().is_empty() {
new_fm.push_str(key_part);
new_fm.push_str(": ");
new_fm.push_str(value);
new_fm.push('\n');
changed = true;
continue;
}
}
}
new_fm.push_str(line);
new_fm.push('\n');
}
if !changed {
return Ok(false);
}
fs::write(path, format!("---\n{new_fm}{after}"))?;
Ok(true)
}
fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
std::fs::create_dir_all(dst)?;
for entry in fs::read_dir(src)?.flatten() {
let src_path = entry.path();
let dst_path = dst.join(entry.file_name());
if src_path.is_dir() {
copy_dir_recursive(&src_path, &dst_path)?;
} else {
std::fs::copy(&src_path, &dst_path)?;
}
}
Ok(())
}
#[derive(Debug, Clone)]
pub struct MemoryEntry {
pub title: String,
pub content: String,
pub agent_id: String,
pub source_file: String,
}
pub fn read_workspace_memories(
workspace_dir: &Path,
agent_id: &str,
) -> Result<Vec<MemoryEntry>> {
let mut entries = Vec::new();
let memory_md = workspace_dir.join("MEMORY.md");
if memory_md.is_file() {
let content = fs::read_to_string(&memory_md)?;
let mut split = split_markdown_by_headings(&content, agent_id, "MEMORY.md");
entries.append(&mut split);
}
let memory_dir = workspace_dir.join("memory");
if memory_dir.is_dir() {
if let Ok(dir_entries) = fs::read_dir(&memory_dir) {
for entry in dir_entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md") {
let filename = path.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
if let Ok(content) = fs::read_to_string(&path) {
let mut split = split_markdown_by_headings(
&content, agent_id, &filename,
);
entries.append(&mut split);
}
}
}
}
}
info!(
agent = %agent_id,
entries = entries.len(),
"read workspace memories"
);
Ok(entries)
}
fn split_markdown_by_headings(
content: &str,
agent_id: &str,
source_file: &str,
) -> Vec<MemoryEntry> {
let mut entries = Vec::new();
let mut current_title = String::new();
let mut current_lines: Vec<&str> = Vec::new();
for line in content.lines() {
if let Some(heading) = line.strip_prefix("## ") {
if !current_lines.is_empty() {
let text = current_lines.join("\n").trim().to_owned();
if !text.is_empty() {
entries.push(MemoryEntry {
title: if current_title.is_empty() {
source_file.to_owned()
} else {
current_title.clone()
},
content: text,
agent_id: agent_id.to_owned(),
source_file: source_file.to_owned(),
});
}
}
current_title = heading.trim().to_owned();
current_lines.clear();
} else if line.starts_with("# ") && current_title.is_empty() && current_lines.is_empty() {
} else {
current_lines.push(line);
}
}
if !current_lines.is_empty() {
let text = current_lines.join("\n").trim().to_owned();
if !text.is_empty() {
entries.push(MemoryEntry {
title: if current_title.is_empty() {
source_file.to_owned()
} else {
current_title.clone()
},
content: text,
agent_id: agent_id.to_owned(),
source_file: source_file.to_owned(),
});
}
}
entries
}
#[cfg(feature = "openclaw-migrate")]
pub fn read_sqlite_memories(
db_path: &Path,
agent_id: &str,
) -> Result<Vec<MemoryEntry>> {
use rusqlite::Connection;
let conn = Connection::open_with_flags(
db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY,
)?;
let mut entries = Vec::new();
if has_table(&conn, "chunks") {
let mut stmt = conn.prepare(
"SELECT path, text FROM chunks WHERE text != '' ORDER BY updated_at DESC"
)?;
let rows: Vec<MemoryEntry> = stmt
.query_map([], |row| {
let path: String = row.get(0)?;
let text: String = row.get(1)?;
Ok(MemoryEntry {
title: path,
content: text,
agent_id: agent_id.to_owned(),
source_file: format!("sqlite:chunks:{}", crate::config::loader::path_to_forward_slash(db_path)),
})
})?
.filter_map(|r| r.ok())
.filter(|e| !e.content.trim().is_empty())
.collect();
entries.extend(rows);
}
if has_table(&conn, "memories") {
let columns = table_columns(&conn, "memories")?;
let key_expr = pick_column(&columns, &["key", "id", "name"])
.unwrap_or_else(|| "CAST(rowid AS TEXT)".to_owned());
let content_expr = pick_column(&columns, &["content", "value", "text", "memory"]);
if let Some(content_col) = content_expr {
let sql = format!(
"SELECT {key_expr} AS k, {content_col} AS v FROM memories"
);
let mut stmt = conn.prepare(&sql)?;
let rows: Vec<MemoryEntry> = stmt
.query_map([], |row| {
let key: String = row.get(0).unwrap_or_default();
let content: String = row.get(1).unwrap_or_default();
Ok(MemoryEntry {
title: key,
content,
agent_id: agent_id.to_owned(),
source_file: format!("sqlite:memories:{}", crate::config::loader::path_to_forward_slash(db_path)),
})
})?
.filter_map(|r| r.ok())
.filter(|e| !e.content.trim().is_empty())
.collect();
entries.extend(rows);
}
}
info!(
db = %db_path.display(),
agent = %agent_id,
entries = entries.len(),
"read SQLite memory"
);
Ok(entries)
}
#[cfg(feature = "openclaw-migrate")]
fn has_table(conn: &rusqlite::Connection, name: &str) -> bool {
conn.query_row(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?1 LIMIT 1",
[name],
|_| Ok(()),
).is_ok()
}
#[cfg(feature = "openclaw-migrate")]
fn table_columns(conn: &rusqlite::Connection, table: &str) -> Result<Vec<String>> {
let sql = format!("PRAGMA table_info({table})");
let mut stmt = conn.prepare(&sql)?;
let cols: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(1))?
.filter_map(|r| r.ok())
.map(|c| c.to_ascii_lowercase())
.collect();
Ok(cols)
}
#[cfg(feature = "openclaw-migrate")]
fn pick_column(columns: &[String], candidates: &[&str]) -> Option<String> {
candidates
.iter()
.find(|c| columns.iter().any(|col| col == *c))
.map(|s| s.to_string())
}
pub fn collect_all_memories(openclaw_dir: &Path, config_json: &str) -> Result<Vec<MemoryEntry>> {
let mut all = Vec::new();
let config: serde_json::Value = json5::from_str(config_json)
.or_else(|_| serde_json::from_str(config_json))
.unwrap_or_default();
let default_workspace = openclaw_dir.join("workspace");
if let Some(agents) = config.pointer("/agents/list").and_then(|v| v.as_array()) {
for agent in agents {
let agent_id = agent.get("id").and_then(|v| v.as_str()).unwrap_or("main");
let workspace_path = agent
.get("workspace")
.and_then(|v| v.as_str())
.map(|p| {
let expanded = if let Some(rest) = p.strip_prefix("~/") {
dirs_next::home_dir().unwrap_or_default().join(rest)
} else {
PathBuf::from(p)
};
if expanded.is_dir() {
expanded
} else if let Some(dirname) = expanded.file_name() {
let remapped = openclaw_dir.join(dirname);
if remapped.is_dir() {
info!(
original = %expanded.display(),
remapped = %remapped.display(),
"workspace path remapped"
);
remapped
} else {
expanded
}
} else {
expanded
}
})
.unwrap_or_else(|| default_workspace.clone());
if let Ok(mut entries) = read_workspace_memories(&workspace_path, agent_id) {
all.append(&mut entries);
}
}
} else {
if let Ok(mut entries) = read_workspace_memories(&default_workspace, "main") {
all.append(&mut entries);
}
}
if let Ok(dir_entries) = fs::read_dir(openclaw_dir) {
let known: std::collections::HashSet<String> = all.iter().map(|e| e.agent_id.clone()).collect();
for entry in dir_entries.flatten() {
let path = entry.path();
let name = path.file_name().unwrap_or_default().to_string_lossy().to_string();
if path.is_dir() && name.starts_with("workspace-") {
let agent_id = name.strip_prefix("workspace-").unwrap_or(&name);
if !known.contains(agent_id) {
if let Ok(mut entries) = read_workspace_memories(&path, agent_id) {
all.append(&mut entries);
}
}
}
}
}
let mut sqlite_paths: Vec<(PathBuf, String)> = Vec::new();
let memory_dir = openclaw_dir.join("memory");
if memory_dir.is_dir() {
if let Ok(dir_entries) = fs::read_dir(&memory_dir) {
for entry in dir_entries.flatten() {
let path = entry.path();
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
if ext == "sqlite" || ext == "db" {
let agent_id = path
.file_stem()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let agent_id = if agent_id == "brain" { "main".to_owned() } else { agent_id };
sqlite_paths.push((path, agent_id));
}
}
}
}
if let Ok(dir_entries) = fs::read_dir(openclaw_dir) {
for entry in dir_entries.flatten() {
let path = entry.path();
let name = path.file_name().unwrap_or_default().to_string_lossy().to_string();
if path.is_dir() && name.starts_with("workspace") {
let brain_db = path.join("memory").join("brain.db");
if brain_db.is_file() {
let agent_id = name.strip_prefix("workspace-")
.unwrap_or("main")
.to_owned();
sqlite_paths.push((brain_db, agent_id));
}
}
}
}
for (db_path, agent_id) in &sqlite_paths {
#[cfg(feature = "openclaw-migrate")]
if let Ok(mut entries) = read_sqlite_memories(db_path, agent_id) {
all.append(&mut entries);
}
#[cfg(not(feature = "openclaw-migrate"))]
{
warn!(
db = %db_path.display(),
"SQLite memory found but openclaw-migrate feature not enabled, skipping"
);
}
}
let mut seen = std::collections::HashSet::new();
all.retain(|e| seen.insert(format!("{}:{}", e.title, e.content)));
info!(total = all.len(), "collected all memory entries");
Ok(all)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn skill_md_strips_backtick_wrapped_values() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("SKILL.md");
std::fs::write(
&path,
"---\nname: foo\nhomepage: `https://example.com/`\nmetadata: {\"k\":\"v\"}\n---\n\n# Body\n",
)
.unwrap();
assert!(sanitize_skill_md_frontmatter(&path).unwrap());
let after = std::fs::read_to_string(&path).unwrap();
assert!(
after.contains("homepage: https://example.com/"),
"expected backticks stripped, got:\n{after}"
);
assert!(after.ends_with("\n# Body\n"));
assert!(!sanitize_skill_md_frontmatter(&path).unwrap());
}
#[test]
fn skill_md_no_frontmatter_is_noop() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("SKILL.md");
std::fs::write(&path, "# Just markdown\n\nNo frontmatter here.\n").unwrap();
assert!(!sanitize_skill_md_frontmatter(&path).unwrap());
}
#[test]
fn skill_md_preserves_quoted_strings() {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("SKILL.md");
let original = "---\nname: bar\ndescription: \"already quoted\"\n---\n\nbody\n";
std::fs::write(&path, original).unwrap();
assert!(!sanitize_skill_md_frontmatter(&path).unwrap());
assert_eq!(std::fs::read_to_string(&path).unwrap(), original);
}
#[test]
fn extract_string_content() {
let content = Some(serde_json::Value::String("hello world".to_owned()));
assert_eq!(extract_text_content(&content), "hello world");
}
#[test]
fn extract_array_content() {
let content = Some(serde_json::json!([
{"type": "text", "text": "Hello"},
{"type": "thinking", "thinking": "I should greet"},
{"type": "tool_use", "name": "bash", "input": {}}
]));
let result = extract_text_content(&content);
assert!(result.contains("Hello"));
assert!(result.contains("[thinking]"));
assert!(!result.contains("bash"));
}
#[test]
fn extract_none_content() {
assert_eq!(extract_text_content(&None), "");
}
#[test]
fn parse_session_event_message() {
let line = r#"{"type":"message","id":"abc","timestamp":"2025-01-01","message":{"role":"user","content":"hello"}}"#;
let event: SessionEvent = serde_json::from_str(line).expect("parse");
assert_eq!(event.event_type, "message");
assert_eq!(event.message.as_ref().expect("msg").role, "user");
}
#[test]
fn parse_session_event_custom_memory() {
let line = r#"{"type":"custom","id":"def","customType":"memory_put","data":{"key":"user_name","value":"Alice"}}"#;
let event: SessionEvent = serde_json::from_str(line).expect("parse");
assert_eq!(event.event_type, "custom");
assert_eq!(event.custom_type.as_deref(), Some("memory_put"));
let data = event.data.expect("data");
assert_eq!(data["key"].as_str(), Some("user_name"));
assert_eq!(data["value"].as_str(), Some("Alice"));
}
#[test]
fn make_session_key_bare() {
let key = make_rsclaw_session_key("main", "default");
assert_eq!(key, "agent:default:main");
}
#[test]
fn make_session_key_already_prefixed() {
let key = make_rsclaw_session_key("agent:main:main", "default");
assert_eq!(key, "agent:main:main");
}
#[test]
fn make_session_key_channel_format() {
let key = make_rsclaw_session_key("agent:main:telegram:direct:12345", "main");
assert_eq!(key, "agent:main:telegram:direct:12345");
}
}