use std::fs;
use std::path::Path;
use super::{
AreaResult, LegacyJSONLLine, LegacyMessage, LegacySession, MigrationArea, err, now_iso, uuid_v4,
};
pub(crate) fn import_sessions(oc_root: &Path, ic_root: &Path) -> AreaResult {
let mut all_sessions: Vec<LegacySession> = Vec::new();
let mut warnings = Vec::new();
let sessions_json = oc_root.join("sessions.json");
if sessions_json.exists() {
match fs::read_to_string(&sessions_json) {
Ok(c) => match serde_json::from_str::<Vec<LegacySession>>(&c) {
Ok(s) => all_sessions.extend(s),
Err(e) => warnings.push(format!("Failed to parse sessions.json: {e}")),
},
Err(e) => warnings.push(format!("Failed to read sessions.json: {e}")),
}
}
let agents_dir = oc_root.join("agents");
if agents_dir.exists()
&& let Ok(agents) = fs::read_dir(&agents_dir)
{
for agent_entry in agents.flatten() {
let sess_dir = agent_entry.path().join("sessions");
if !sess_dir.exists() {
continue;
}
if let Ok(files) = fs::read_dir(&sess_dir) {
for file in files.flatten() {
let path = file.path();
match path.extension().and_then(|e| e.to_str()) {
Some("jsonl") => {
if let Ok(content) = fs::read_to_string(&path) {
let mut created_at: Option<String> = None;
let mut msgs: Vec<LegacyMessage> = Vec::new();
for line in content.lines() {
if let Ok(wrapper) =
serde_json::from_str::<LegacyJSONLLine>(line)
&& let Some(lt) = wrapper.line_type.as_deref()
{
if lt == "session" {
created_at = wrapper.timestamp.clone();
}
if lt == "message"
&& let Some(msg) = wrapper.message.and_then(|m| {
m.into_message(wrapper.timestamp.as_deref())
})
{
msgs.push(msg);
}
continue;
}
if let Ok(msg) = serde_json::from_str::<LegacyMessage>(line) {
msgs.push(msg);
}
}
if !msgs.is_empty() {
all_sessions.push(LegacySession {
id: Some(
path.file_stem()
.unwrap_or_default()
.to_string_lossy()
.into(),
),
agent_id: Some(
agent_entry.file_name().to_string_lossy().into(),
),
created_at,
messages: Some(msgs),
});
}
}
}
Some("json") => {
if let Ok(content) = fs::read_to_string(&path)
&& let Ok(s) = serde_json::from_str::<LegacySession>(&content)
{
all_sessions.push(s);
}
}
_ => {}
}
}
}
}
}
if all_sessions.is_empty() && !sessions_json.exists() {
return AreaResult {
area: MigrationArea::Sessions,
success: true,
items_processed: 0,
warnings: vec!["No sessions found to import".into()],
error: None,
};
}
let db_path = ic_root.join("state.db");
let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
Ok(d) => d,
Err(e) => {
return err(
MigrationArea::Sessions,
format!("Failed to open database: {e}"),
);
}
};
let conn = db.conn();
let mut items = 0;
for session in &all_sessions {
let default_id = uuid_v4();
let sid = session.id.as_deref().unwrap_or(&default_id);
let agent = session.agent_id.as_deref().unwrap_or("default");
let default_ts = now_iso();
let created = session.created_at.as_deref().unwrap_or(&default_ts);
if let Err(e) = conn.execute(
"INSERT OR IGNORE INTO sessions (id, agent_id, created_at) VALUES (?1, ?2, ?3)",
rusqlite::params![sid, agent, created],
) {
warnings.push(format!("Failed to insert session {sid}: {e}"));
continue;
}
if let Some(msgs) = &session.messages {
for msg in msgs {
let mid = uuid_v4();
let role = msg.role.as_deref().unwrap_or("user");
let content = msg.content.as_deref().unwrap_or("");
let ts = msg.timestamp.as_deref().unwrap_or(created);
if let Err(e) = conn.execute(
"INSERT OR IGNORE INTO session_messages (id, session_id, role, content, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![mid, sid, role, content, ts],
) {
warnings.push(format!("Failed to import message for session {sid}: {e}"));
}
}
}
items += 1;
}
AreaResult {
area: MigrationArea::Sessions,
success: true,
items_processed: items,
warnings,
error: None,
}
}
pub(crate) fn export_sessions(ic_root: &Path, oc_root: &Path) -> AreaResult {
let db_path = ic_root.join("state.db");
if !db_path.exists() {
return AreaResult {
area: MigrationArea::Sessions,
success: true,
items_processed: 0,
warnings: vec!["No database found".into()],
error: None,
};
}
let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
Ok(d) => d,
Err(e) => {
return err(
MigrationArea::Sessions,
format!("Failed to open database: {e}"),
);
}
};
let conn = db.conn();
let mut warnings = Vec::new();
let mut all: Vec<serde_json::Value> = Vec::new();
let mut stmt =
match conn.prepare("SELECT id, agent_id, created_at FROM sessions ORDER BY created_at") {
Ok(s) => s,
Err(e) => {
return err(
MigrationArea::Sessions,
format!("Failed to query sessions: {e}"),
);
}
};
let sessions: Vec<(String, String, String)> = match stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
}) {
Ok(rows) => rows
.filter_map(|r| {
r.inspect_err(|e| tracing::warn!("skipping corrupted session row: {e}"))
.ok()
})
.collect(),
Err(e) => {
return err(
MigrationArea::Sessions,
format!("Failed to iterate sessions: {e}"),
);
}
};
for (sid, agent_id, created_at) in &sessions {
let mut msg_stmt = match conn.prepare(
"SELECT role, content, created_at FROM session_messages WHERE session_id = ?1 ORDER BY created_at"
) {
Ok(s) => s,
Err(e) => { warnings.push(format!("Failed to query msgs for {sid}: {e}")); continue; }
};
let messages: Vec<serde_json::Value> = msg_stmt
.query_map(rusqlite::params![sid], |row| {
Ok(serde_json::json!({
"role": row.get::<_, String>(0)?,
"content": row.get::<_, String>(1)?,
"timestamp": row.get::<_, String>(2)?,
}))
})
.map(|iter| {
iter.filter_map(|r| {
r.inspect_err(
|e| tracing::warn!(session = %sid, "skipping corrupted message row: {e}"),
)
.ok()
})
.collect()
})
.unwrap_or_default();
all.push(serde_json::json!({
"id": sid, "agent_id": agent_id, "created_at": created_at, "messages": messages,
}));
}
if let Err(e) = fs::create_dir_all(oc_root) {
return err(
MigrationArea::Sessions,
format!("Failed to create output dir: {e}"),
);
}
let sessions_json = match serde_json::to_string_pretty(&all) {
Ok(s) => s,
Err(e) => {
return err(
MigrationArea::Sessions,
format!("Failed to serialize sessions.json: {e}"),
);
}
};
if let Err(e) = fs::write(oc_root.join("sessions.json"), &sessions_json) {
return err(
MigrationArea::Sessions,
format!("Failed to write sessions.json: {e}"),
);
}
AreaResult {
area: MigrationArea::Sessions,
success: true,
items_processed: all.len(),
warnings,
error: None,
}
}