roboticus-cli 0.11.4

CLI commands and migration engine for the Roboticus agent runtime
Documentation
//! Session data migration transforms.

use std::fs;
use std::path::Path;

use super::{
    AreaResult, LegacyJSONLLine, LegacyMessage, LegacySession, MigrationArea, err, now_iso, uuid_v4,
};

pub(crate) fn import_sessions(oc_root: &Path, ic_root: &Path) -> AreaResult {
    let mut all_sessions: Vec<LegacySession> = Vec::new();
    let mut warnings = Vec::new();

    // sessions.json (top-level array)
    let sessions_json = oc_root.join("sessions.json");
    if sessions_json.exists() {
        match fs::read_to_string(&sessions_json) {
            Ok(c) => match serde_json::from_str::<Vec<LegacySession>>(&c) {
                Ok(s) => all_sessions.extend(s),
                Err(e) => warnings.push(format!("Failed to parse sessions.json: {e}")),
            },
            Err(e) => warnings.push(format!("Failed to read sessions.json: {e}")),
        }
    }

    // agents/<agent>/sessions/*.jsonl
    let agents_dir = oc_root.join("agents");
    if agents_dir.exists()
        && let Ok(agents) = fs::read_dir(&agents_dir)
    {
        for agent_entry in agents.flatten() {
            let sess_dir = agent_entry.path().join("sessions");
            if !sess_dir.exists() {
                continue;
            }
            if let Ok(files) = fs::read_dir(&sess_dir) {
                for file in files.flatten() {
                    let path = file.path();
                    match path.extension().and_then(|e| e.to_str()) {
                        Some("jsonl") => {
                            if let Ok(content) = fs::read_to_string(&path) {
                                let mut created_at: Option<String> = None;
                                let mut msgs: Vec<LegacyMessage> = Vec::new();
                                for line in content.lines() {
                                    if let Ok(wrapper) =
                                        serde_json::from_str::<LegacyJSONLLine>(line)
                                        && let Some(lt) = wrapper.line_type.as_deref()
                                    {
                                        if lt == "session" {
                                            created_at = wrapper.timestamp.clone();
                                        }
                                        if lt == "message"
                                            && let Some(msg) = wrapper.message.and_then(|m| {
                                                m.into_message(wrapper.timestamp.as_deref())
                                            })
                                        {
                                            msgs.push(msg);
                                        }
                                        continue;
                                    }
                                    if let Ok(msg) = serde_json::from_str::<LegacyMessage>(line) {
                                        msgs.push(msg);
                                    }
                                }
                                if !msgs.is_empty() {
                                    all_sessions.push(LegacySession {
                                        id: Some(
                                            path.file_stem()
                                                .unwrap_or_default()
                                                .to_string_lossy()
                                                .into(),
                                        ),
                                        agent_id: Some(
                                            agent_entry.file_name().to_string_lossy().into(),
                                        ),
                                        created_at,
                                        messages: Some(msgs),
                                    });
                                }
                            }
                        }
                        Some("json") => {
                            if let Ok(content) = fs::read_to_string(&path)
                                && let Ok(s) = serde_json::from_str::<LegacySession>(&content)
                            {
                                all_sessions.push(s);
                            }
                        }
                        _ => {}
                    }
                }
            }
        }
    }

    if all_sessions.is_empty() && !sessions_json.exists() {
        return AreaResult {
            area: MigrationArea::Sessions,
            success: true,
            items_processed: 0,
            warnings: vec!["No sessions found to import".into()],
            error: None,
        };
    }

    let db_path = ic_root.join("state.db");
    let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
        Ok(d) => d,
        Err(e) => {
            return err(
                MigrationArea::Sessions,
                format!("Failed to open database: {e}"),
            );
        }
    };

    let conn = db.conn();
    let mut items = 0;
    for session in &all_sessions {
        let default_id = uuid_v4();
        let sid = session.id.as_deref().unwrap_or(&default_id);
        let agent = session.agent_id.as_deref().unwrap_or("default");
        let default_ts = now_iso();
        let created = session.created_at.as_deref().unwrap_or(&default_ts);

        if let Err(e) = conn.execute(
            "INSERT OR IGNORE INTO sessions (id, agent_id, created_at) VALUES (?1, ?2, ?3)",
            rusqlite::params![sid, agent, created],
        ) {
            warnings.push(format!("Failed to insert session {sid}: {e}"));
            continue;
        }

        if let Some(msgs) = &session.messages {
            for msg in msgs {
                let mid = uuid_v4();
                let role = msg.role.as_deref().unwrap_or("user");
                let content = msg.content.as_deref().unwrap_or("");
                let ts = msg.timestamp.as_deref().unwrap_or(created);
                if let Err(e) = conn.execute(
                    "INSERT OR IGNORE INTO session_messages (id, session_id, role, content, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
                    rusqlite::params![mid, sid, role, content, ts],
                ) {
                    warnings.push(format!("Failed to import message for session {sid}: {e}"));
                }
            }
        }
        items += 1;
    }

    AreaResult {
        area: MigrationArea::Sessions,
        success: true,
        items_processed: items,
        warnings,
        error: None,
    }
}

pub(crate) fn export_sessions(ic_root: &Path, oc_root: &Path) -> AreaResult {
    let db_path = ic_root.join("state.db");
    if !db_path.exists() {
        return AreaResult {
            area: MigrationArea::Sessions,
            success: true,
            items_processed: 0,
            warnings: vec!["No database found".into()],
            error: None,
        };
    }

    let db = match roboticus_db::Database::new(&db_path.to_string_lossy()) {
        Ok(d) => d,
        Err(e) => {
            return err(
                MigrationArea::Sessions,
                format!("Failed to open database: {e}"),
            );
        }
    };

    let conn = db.conn();
    let mut warnings = Vec::new();
    let mut all: Vec<serde_json::Value> = Vec::new();

    let mut stmt =
        match conn.prepare("SELECT id, agent_id, created_at FROM sessions ORDER BY created_at") {
            Ok(s) => s,
            Err(e) => {
                return err(
                    MigrationArea::Sessions,
                    format!("Failed to query sessions: {e}"),
                );
            }
        };
    let sessions: Vec<(String, String, String)> = match stmt.query_map([], |row| {
        Ok((
            row.get::<_, String>(0)?,
            row.get::<_, String>(1)?,
            row.get::<_, String>(2)?,
        ))
    }) {
        Ok(rows) => rows
            .filter_map(|r| {
                r.inspect_err(|e| tracing::warn!("skipping corrupted session row: {e}"))
                    .ok()
            })
            .collect(),
        Err(e) => {
            return err(
                MigrationArea::Sessions,
                format!("Failed to iterate sessions: {e}"),
            );
        }
    };

    for (sid, agent_id, created_at) in &sessions {
        let mut msg_stmt = match conn.prepare(
            "SELECT role, content, created_at FROM session_messages WHERE session_id = ?1 ORDER BY created_at"
        ) {
            Ok(s) => s,
            Err(e) => { warnings.push(format!("Failed to query msgs for {sid}: {e}")); continue; }
        };
        let messages: Vec<serde_json::Value> = msg_stmt
            .query_map(rusqlite::params![sid], |row| {
                Ok(serde_json::json!({
                    "role": row.get::<_, String>(0)?,
                    "content": row.get::<_, String>(1)?,
                    "timestamp": row.get::<_, String>(2)?,
                }))
            })
            .map(|iter| {
                iter.filter_map(|r| {
                    r.inspect_err(
                        |e| tracing::warn!(session = %sid, "skipping corrupted message row: {e}"),
                    )
                    .ok()
                })
                .collect()
            })
            .unwrap_or_default();

        all.push(serde_json::json!({
            "id": sid, "agent_id": agent_id, "created_at": created_at, "messages": messages,
        }));
    }

    if let Err(e) = fs::create_dir_all(oc_root) {
        return err(
            MigrationArea::Sessions,
            format!("Failed to create output dir: {e}"),
        );
    }
    let sessions_json = match serde_json::to_string_pretty(&all) {
        Ok(s) => s,
        Err(e) => {
            return err(
                MigrationArea::Sessions,
                format!("Failed to serialize sessions.json: {e}"),
            );
        }
    };
    if let Err(e) = fs::write(oc_root.join("sessions.json"), &sessions_json) {
        return err(
            MigrationArea::Sessions,
            format!("Failed to write sessions.json: {e}"),
        );
    }

    AreaResult {
        area: MigrationArea::Sessions,
        success: true,
        items_processed: all.len(),
        warnings,
        error: None,
    }
}