rho-coding-agent 0.8.0

A lightweight agent harness inspired by Pi
use std::{
    collections::HashSet,
    fs,
    path::{Path, PathBuf},
};

use rusqlite::{params, Connection, OptionalExtension};

use crate::model::Message;

use super::{
    clamp_u64_to_i64, session_dir_in_root, session_file_stats, session_id_from_path,
    set_private_dir_permissions, summarize_session_file, unix_timestamp_secs, user_message_text,
    workspace_key, Session, SessionIndexRecord, SessionSummary,
};

pub(super) fn list_workspace_sessions(
    session_root: &Path,
    cwd: &Path,
) -> anyhow::Result<Vec<SessionSummary>> {
    sync_workspace(session_root, cwd)?;
    let connection = open_index(session_root)?;
    let workspace_key = workspace_key(cwd);
    let mut statement = connection.prepare(
        "select id, path, cwd, created_at, updated_at, message_count,
                title, first_user_message, last_user_message
         from sessions
         where workspace_key = ?1
         order by updated_at desc, created_at desc, id asc",
    )?;
    let rows = statement.query_map(params![workspace_key], summary_from_row)?;
    Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}

pub(super) fn matching_session_paths(
    session_root: &Path,
    cwd: &Path,
    id_prefix: &str,
) -> anyhow::Result<Vec<PathBuf>> {
    let connection = open_index(session_root)?;
    let workspace_key = workspace_key(cwd);
    let mut statement = connection.prepare(
        "select path
         from sessions
         where workspace_key = ?1 and substr(id, 1, length(?2)) = ?2
         order by id asc",
    )?;
    let rows = statement.query_map(params![workspace_key, id_prefix], |row| {
        let path: String = row.get(0)?;
        Ok(PathBuf::from(path))
    })?;
    Ok(rows
        .collect::<rusqlite::Result<Vec<_>>>()?
        .into_iter()
        .filter(|path| path.exists())
        .collect())
}

pub(super) fn sync_workspace(session_root: &Path, cwd: &Path) -> anyhow::Result<()> {
    let connection = open_index(session_root)?;
    let workspace_key = workspace_key(cwd);
    let dir = session_dir_in_root(session_root, cwd);
    let mut seen = HashSet::new();

    if dir.exists() {
        for entry in fs::read_dir(&dir)? {
            let path = entry?.path();
            let Some(id) = session_id_from_path(&path) else {
                continue;
            };
            seen.insert(id.clone());
            let (file_size, file_mtime) = session_file_stats(&path);
            if indexed_file_is_current(
                &connection,
                &workspace_key,
                &id,
                &path,
                file_size,
                file_mtime,
            )? {
                continue;
            }
            if let Ok(record) = summarize_session_file(&path, cwd) {
                upsert_record(&connection, &workspace_key, &record)?;
            }
        }
    }

    remove_stale_records(&connection, &workspace_key, &seen)?;
    Ok(())
}

pub(super) fn record_created(session: &Session, created_at: u64) -> anyhow::Result<()> {
    let connection = open_index(&session.session_root)?;
    let (file_size, file_mtime) = session_file_stats(&session.path);
    let record = SessionIndexRecord {
        summary: SessionSummary {
            id: session.id.clone(),
            path: session.path.clone(),
            cwd: session.cwd.clone(),
            created_at,
            updated_at: created_at,
            message_count: 0,
            title: None,
            first_user_message: None,
            last_user_message: None,
        },
        file_size,
        file_mtime,
    };
    upsert_record(&connection, &session.workspace_key, &record)
}

pub(super) fn set_title(
    session_root: &Path,
    cwd: &Path,
    id_prefix: &str,
    title: &str,
) -> anyhow::Result<()> {
    let connection = open_index(session_root)?;
    let workspace_key = workspace_key(cwd);
    let paths = matching_session_paths(session_root, cwd, id_prefix)?;
    match paths.as_slice() {
        [] => anyhow::bail!("no session found matching '{id_prefix}'"),
        [path] => {
            let id = session_id_from_path(path).ok_or_else(|| {
                anyhow::anyhow!("session file has invalid name: {}", path.display())
            })?;
            connection.execute(
                "update sessions set title = ?3 where workspace_key = ?1 and id = ?2",
                params![workspace_key, id, title.trim()],
            )?;
            Ok(())
        }
        _ => anyhow::bail!("multiple sessions match '{id_prefix}'; use a longer UUID prefix"),
    }
}

pub(super) fn record_message(session: &Session, message: &Message) -> anyhow::Result<()> {
    let connection = open_index(&session.session_root)?;
    let updated_at = clamp_u64_to_i64(unix_timestamp_secs());
    let user_message = user_message_text(message);
    let (file_size, file_mtime) = session_file_stats(&session.path);
    let rows = connection.execute(
        "update sessions
         set updated_at = max(updated_at, ?3),
             message_count = message_count + 1,
             first_user_message = coalesce(first_user_message, ?4),
             last_user_message = coalesce(?4, last_user_message),
             file_size = ?5,
             file_mtime = ?6
         where workspace_key = ?1 and id = ?2",
        params![
            session.workspace_key.as_str(),
            session.id.as_str(),
            updated_at,
            user_message,
            file_size,
            file_mtime
        ],
    )?;
    if rows == 0 {
        let record = summarize_session_file(&session.path, &session.cwd)?;
        upsert_record(&connection, &session.workspace_key, &record)?;
    }
    Ok(())
}

pub(super) fn record_replaced(session: &Session) -> anyhow::Result<()> {
    let connection = open_index(&session.session_root)?;
    let record = summarize_session_file(&session.path, &session.cwd)?;
    upsert_record(&connection, &session.workspace_key, &record)
}

fn open_index(session_root: &Path) -> anyhow::Result<Connection> {
    fs::create_dir_all(session_root)?;
    set_private_dir_permissions(session_root)?;
    let path = session_root.join("index.sqlite3");
    let connection = Connection::open(&path)?;
    set_private_file_permissions(&path)?;
    connection.execute_batch(
        "create table if not exists sessions (
            workspace_key text not null,
            cwd text not null,
            id text not null,
            path text not null,
            created_at integer not null,
            updated_at integer not null,
            message_count integer not null default 0,
            title text,
            first_user_message text,
            last_user_message text,
            file_size integer,
            file_mtime integer,
            primary key (workspace_key, id)
        );
        create index if not exists sessions_workspace_updated_idx
            on sessions(workspace_key, updated_at desc);
        create index if not exists sessions_workspace_id_idx
            on sessions(workspace_key, id);",
    )?;
    ensure_column(&connection, "title text")?;
    ensure_column(&connection, "first_user_message text")?;
    Ok(connection)
}

fn ensure_column(connection: &Connection, column_definition: &str) -> anyhow::Result<()> {
    let column_name = column_definition
        .split_whitespace()
        .next()
        .ok_or_else(|| anyhow::anyhow!("column definition must include a name"))?;
    let mut statement = connection.prepare("pragma table_info(sessions)")?;
    let columns = statement.query_map([], |row| row.get::<_, String>(1))?;
    let exists = columns
        .collect::<rusqlite::Result<Vec<_>>>()?
        .iter()
        .any(|column| column == column_name);
    if !exists {
        connection.execute(
            &format!("alter table sessions add column {column_definition}"),
            [],
        )?;
    }
    Ok(())
}

fn set_private_file_permissions(path: &Path) -> anyhow::Result<()> {
    #[cfg(unix)]
    {
        use std::os::unix::fs::PermissionsExt;
        fs::set_permissions(path, fs::Permissions::from_mode(0o600))?;
    }
    #[cfg(not(unix))]
    {
        let _ = path;
    }
    Ok(())
}

fn indexed_file_is_current(
    connection: &Connection,
    workspace_key: &str,
    id: &str,
    path: &Path,
    file_size: Option<i64>,
    file_mtime: Option<i64>,
) -> rusqlite::Result<bool> {
    let current = connection
        .query_row(
            "select path, file_size, file_mtime, message_count, first_user_message
             from sessions where workspace_key = ?1 and id = ?2",
            params![workspace_key, id],
            |row| {
                Ok((
                    row.get::<_, String>(0)?,
                    row.get::<_, Option<i64>>(1)?,
                    row.get::<_, Option<i64>>(2)?,
                    row.get::<_, i64>(3)?,
                    row.get::<_, Option<String>>(4)?,
                ))
            },
        )
        .optional()?;
    Ok(current.is_some_and(
        |(indexed_path, indexed_size, indexed_mtime, message_count, first_user_message)| {
            indexed_path == path.to_string_lossy().as_ref()
                && indexed_size == file_size
                && indexed_mtime == file_mtime
                && (message_count == 0 || first_user_message.is_some())
        },
    ))
}

fn upsert_record(
    connection: &Connection,
    workspace_key: &str,
    record: &SessionIndexRecord,
) -> anyhow::Result<()> {
    let cwd = record.summary.cwd.to_string_lossy().to_string();
    let path = record.summary.path.to_string_lossy().to_string();
    connection.execute(
        "insert into sessions (
            workspace_key,
            cwd,
            id,
            path,
            created_at,
            updated_at,
            message_count,
            title,
            first_user_message,
            last_user_message,
            file_size,
            file_mtime
         ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
         on conflict(workspace_key, id) do update set
            cwd = excluded.cwd,
            path = excluded.path,
            created_at = excluded.created_at,
            updated_at = excluded.updated_at,
            message_count = excluded.message_count,
            title = coalesce(sessions.title, excluded.title),
            first_user_message = excluded.first_user_message,
            last_user_message = excluded.last_user_message,
            file_size = excluded.file_size,
            file_mtime = excluded.file_mtime",
        params![
            workspace_key,
            cwd,
            record.summary.id.as_str(),
            path,
            clamp_u64_to_i64(record.summary.created_at),
            clamp_u64_to_i64(record.summary.updated_at),
            clamp_u64_to_i64(record.summary.message_count),
            record.summary.title.as_deref(),
            record.summary.first_user_message.as_deref(),
            record.summary.last_user_message.as_deref(),
            record.file_size,
            record.file_mtime,
        ],
    )?;
    Ok(())
}

fn remove_stale_records(
    connection: &Connection,
    workspace_key: &str,
    seen: &HashSet<String>,
) -> anyhow::Result<()> {
    let mut statement =
        connection.prepare("select id, path from sessions where workspace_key = ?1")?;
    let rows = statement.query_map(params![workspace_key], |row| {
        Ok((
            row.get::<_, String>(0)?,
            PathBuf::from(row.get::<_, String>(1)?),
        ))
    })?;
    let stale_ids = rows
        .collect::<rusqlite::Result<Vec<_>>>()?
        .into_iter()
        .filter_map(|(id, path)| (!seen.contains(&id) || !path.exists()).then_some(id))
        .collect::<Vec<_>>();

    for id in stale_ids {
        connection.execute(
            "delete from sessions where workspace_key = ?1 and id = ?2",
            params![workspace_key, id],
        )?;
    }
    Ok(())
}

fn summary_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionSummary> {
    Ok(SessionSummary {
        id: row.get(0)?,
        path: PathBuf::from(row.get::<_, String>(1)?),
        cwd: PathBuf::from(row.get::<_, String>(2)?),
        created_at: row.get::<_, i64>(3)?.max(0) as u64,
        updated_at: row.get::<_, i64>(4)?.max(0) as u64,
        message_count: row.get::<_, i64>(5)?.max(0) as u64,
        title: row.get(6)?,
        first_user_message: row.get(7)?,
        last_user_message: row.get(8)?,
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    #[test]
    fn open_index_creates_schema() {
        let root = TempDir::new().unwrap();
        let connection = open_index(root.path()).unwrap();

        let table_count: i64 = connection
            .query_row(
                "select count(*) from sqlite_master where type = 'table' and name = 'sessions'",
                [],
                |row| row.get(0),
            )
            .unwrap();

        assert_eq!(table_count, 1);
        assert!(root.path().join("index.sqlite3").exists());
    }
}