chasm-cli 1.5.4

Universal chat session manager - harvest, merge, and analyze AI chat history from VS Code, Cursor, and other editors
Documentation
"""
Comprehensive fix for Agentic workspace session loading.

Root cause: The .jsonl file only has 1 request (from a truncated migration),
while the .json.bak has the full 28-request session. Also fixes stale DB refs.

Steps:
1. Read .json.bak (26MB, 28 requests - the REAL session data)
2. Clean up and convert to proper ISerializableChatData3 format
3. Write as JSONL kind:0 format, replacing the tiny .jsonl
4. Clean up stale DB references (state.cache, memento)
5. Update index and model cache with correct metadata
"""
import sqlite3
import json
import os
import shutil
import base64
from datetime import datetime

WS_HASH = "5ec71800c69c79b96b06a37e38537907"
WS_DIR = os.path.join(os.environ["APPDATA"], "Code", "User", "workspaceStorage", WS_HASH)
DB_PATH = os.path.join(WS_DIR, "state.vscdb")
SESSIONS_DIR = os.path.join(WS_DIR, "chatSessions")

SESSION_ID = "6be29cba-331e-4aa4-bc58-659cc20f4800"
JSON_BAK = os.path.join(SESSIONS_DIR, f"{SESSION_ID}.json.bak")
JSONL_FILE = os.path.join(SESSIONS_DIR, f"{SESSION_ID}.jsonl")
JSONL_BAK = os.path.join(SESSIONS_DIR, f"{SESSION_ID}.jsonl.bak")


def read_db_key(conn, key):
    cur = conn.execute("SELECT value FROM ItemTable WHERE key = ?", (key,))
    row = cur.fetchone()
    if row is None:
        return None
    try:
        return json.loads(row[0])
    except:
        return row[0]


def write_db_key(conn, key, value):
    json_str = json.dumps(value, separators=(",", ":"))
    conn.execute(
        "INSERT OR REPLACE INTO ItemTable (key, value) VALUES (?, ?)",
        (key, json_str),
    )


def session_to_b64(session_id):
    return base64.b64encode(session_id.encode()).decode()


def session_resource(session_id):
    return f"vscode-chat-session://local/{session_to_b64(session_id)}"


def main():
    print("=" * 60)
    print("  Agentic Workspace Comprehensive Fix")
    print("=" * 60)

    # ─── Step 1: Read .json.bak ───────────────────────────────
    print(f"\n[1] Reading .json.bak ({os.path.getsize(JSON_BAK):,} bytes)...")
    with open(JSON_BAK, "r", encoding="utf-8") as f:
        full_data = json.load(f)

    num_requests = len(full_data.get("requests", []))
    print(f"    Session: {full_data.get('customTitle')}")
    print(f"    Requests: {num_requests}")
    print(f"    Version: {full_data.get('version')}")

    # ─── Step 2: Clean up data format ────────────────────────
    print("\n[2] Cleaning up data format...")

    # Build proper ISerializableChatData3
    clean_data = {
        "version": 3,
        "sessionId": SESSION_ID,
        "creationDate": full_data["creationDate"],
        "initialLocation": full_data.get("initialLocation", "panel"),
        "requests": full_data["requests"],
        "responderUsername": full_data.get("responderUsername", "GitHub Copilot"),
        "customTitle": full_data.get("customTitle"),
        "hasPendingEdits": False,
    }

    # Convert old top-level input state fields to inputState object
    if "inputText" in full_data or "attachments" in full_data:
        clean_data["inputState"] = {
            "attachments": full_data.get("attachments", []),
            "mode": full_data.get("mode", {"id": "agent", "kind": "agent"}),
            "selectedModel": full_data.get("selectedModel"),
            "inputText": full_data.get("inputText", ""),
            "selections": full_data.get("selections", []),
            "contrib": full_data.get("contrib", {}),
        }

    # Clear pendingRequests
    clean_data["pendingRequests"] = []

    # Fix modelState values: ensure terminal states have completedAt
    fixed_states = 0
    for req in clean_data["requests"]:
        ms = req.get("modelState")
        if isinstance(ms, dict):
            val = ms.get("value")
            # Fix Pending (0) or NeedsInput states → Cancelled with completedAt
            if val == 0 or val == 2:
                req["modelState"] = {
                    "value": 3,  # Cancelled
                    "completedAt": req.get("timestamp", int(datetime.now().timestamp() * 1000)),
                }
                fixed_states += 1
            # Ensure terminal states have completedAt
            elif val in (1, 3, 4) and "completedAt" not in ms:
                ms["completedAt"] = req.get("timestamp", int(datetime.now().timestamp() * 1000))
                fixed_states += 1

    if fixed_states > 0:
        print(f"    Fixed {fixed_states} modelState entries")

    print(f"    Clean data: {len(clean_data['requests'])} requests, version {clean_data['version']}")

    # ─── Step 3: Write JSONL ─────────────────────────────────
    print("\n[3] Writing JSONL file...")

    # Backup current .jsonl
    if os.path.exists(JSONL_FILE):
        backup_path = JSONL_FILE + ".pre_fix_bak"
        shutil.copy2(JSONL_FILE, backup_path)
        print(f"    Backed up current .jsonl to {os.path.basename(backup_path)}")

    # Write new JSONL
    jsonl_line = json.dumps({"kind": 0, "v": clean_data}, separators=(",", ":"))
    with open(JSONL_FILE, "w", encoding="utf-8", newline="") as f:
        f.write(jsonl_line)
        f.write("\n")

    new_size = os.path.getsize(JSONL_FILE)
    print(f"    Written: {new_size:,} bytes ({new_size/1024/1024:.1f} MB)")

    # ─── Step 4: Fix DB references ───────────────────────────
    print("\n[4] Fixing DB references...")
    conn = sqlite3.connect(DB_PATH)

    # Get existing session files on disk
    session_files = set()
    for f in os.listdir(SESSIONS_DIR):
        if f.endswith(".jsonl") and not f.endswith(".bak") and not f.endswith(".pre_fix_bak"):
            sid = f.replace(".jsonl", "")
            session_files.add(sid)
    print(f"    Session files on disk: {session_files}")

    # ── Fix index ──
    # Get request timing from the full session
    requests = clean_data["requests"]
    first_req_ts = requests[0].get("timestamp") if requests else clean_data["creationDate"]
    last_req_ts = requests[-1].get("timestamp") if requests else clean_data["creationDate"]
    last_resp_end = None
    for req in reversed(requests):
        ms = req.get("modelState", {})
        if isinstance(ms, dict) and "completedAt" in ms:
            last_resp_end = ms["completedAt"]
            break

    index = {
        "version": 1,
        "entries": {}
    }

    # Add the main session
    index["entries"][SESSION_ID] = {
        "sessionId": SESSION_ID,
        "title": clean_data["customTitle"] or "Porting VS Code and Copilot Chat to Rust",
        "lastMessageDate": last_req_ts,
        "timing": {
            "created": clean_data["creationDate"],
            "lastRequestStarted": last_req_ts,
            "lastRequestEnded": last_resp_end or last_req_ts,
        },
        "initialLocation": "panel",
        "hasPendingEdits": False,
        "isEmpty": False,
        "isExternal": False,
        "lastResponseState": 1,
    }

    # Add other valid session files (not backups)
    for sid in session_files:
        if sid != SESSION_ID:
            # Read the session to check if it's empty
            sfile = os.path.join(SESSIONS_DIR, f"{sid}.jsonl")
            try:
                with open(sfile, "r") as f:
                    first_line = f.readline().strip()
                sdata = json.loads(first_line)
                if sdata.get("kind") == 0:
                    sv = sdata["v"]
                    sreqs = sv.get("requests", [])
                    index["entries"][sid] = {
                        "sessionId": sid,
                        "title": sv.get("customTitle") or "Untitled",
                        "lastMessageDate": sv.get("creationDate", 0),
                        "timing": {
                            "created": sv.get("creationDate", 0),
                            "lastRequestStarted": sv.get("creationDate", 0),
                            "lastRequestEnded": sv.get("creationDate", 0),
                        },
                        "lastResponseState": 1,
                        "initialLocation": "panel",
                        "isEmpty": len(sreqs) == 0,
                        "isExternal": False,
                        "hasPendingEdits": False,
                    }
            except Exception as e:
                print(f"    Skipping {sid}: {e}")

    write_db_key(conn, "chat.ChatSessionStore.index", index)
    print(f"    Index: {len(index['entries'])} entries")

    # ── Fix model cache ──
    model_cache = []
    # Only add non-empty sessions to model cache
    for sid, entry in index["entries"].items():
        if not entry.get("isEmpty", True) or sid == SESSION_ID:
            model_cache.append({
                "providerType": "local",
                "providerLabel": "Local",
                "resource": session_resource(sid),
                "icon": "vm",
                "label": entry["title"],
                "status": 1,
                "timing": entry["timing"],
                "initialLocation": entry.get("initialLocation", "panel"),
                "hasPendingEdits": False,
                "isEmpty": entry.get("isEmpty", False),
                "isExternal": False,
                "lastResponseState": 1,
            })

    write_db_key(conn, "agentSessions.model.cache", model_cache)
    print(f"    Model cache: {len(model_cache)} entries")

    # ── Fix state cache ──
    # Only keep entries for sessions that exist on disk
    state_cache = read_db_key(conn, "agentSessions.state.cache") or []
    valid_resources = {session_resource(sid) for sid in session_files}
    if isinstance(state_cache, list):
        new_state_cache = [e for e in state_cache if isinstance(e, dict) and e.get("resource") in valid_resources]
        removed = len(state_cache) - len(new_state_cache)
        if removed > 0:
            print(f"    State cache: removed {removed} stale entries, keeping {len(new_state_cache)}")
        else:
            print(f"    State cache: {len(new_state_cache)} entries (all valid)")
        write_db_key(conn, "agentSessions.state.cache", new_state_cache)

    # ── Fix memento ──
    memento = read_db_key(conn, "memento/interactive-session-view-copilot")
    if memento and isinstance(memento, dict):
        old_sid = memento.get("sessionId")
        if old_sid != SESSION_ID:
            # Point memento to the main session instead of the empty auto-created one
            memento["sessionId"] = SESSION_ID
            write_db_key(conn, "memento/interactive-session-view-copilot", memento)
            print(f"    Memento: updated sessionId from {old_sid} to {SESSION_ID}")
        else:
            print(f"    Memento: already points to {SESSION_ID}")

    conn.commit()
    conn.close()

    # ─── Step 5: Clean up orphaned files ─────────────────────
    print("\n[5] Checking for orphaned session files...")
    orphans = session_files - set(index["entries"].keys())
    if orphans:
        for sid in orphans:
            print(f"    Orphan: {sid}.jsonl (not in index, leaving as-is)")
    else:
        print("    No orphaned files")

    # ─── Verification ────────────────────────────────────────
    print("\n" + "=" * 60)
    print("  VERIFICATION")
    print("=" * 60)

    # Re-read and verify
    conn = sqlite3.connect(DB_PATH)
    v_index = read_db_key(conn, "chat.ChatSessionStore.index")
    v_mcache = read_db_key(conn, "agentSessions.model.cache")
    v_scache = read_db_key(conn, "agentSessions.state.cache")
    v_memento = read_db_key(conn, "memento/interactive-session-view-copilot")
    conn.close()

    print(f"\n  Index entries: {len(v_index.get('entries', {}))}")
    for sid, e in v_index.get("entries", {}).items():
        print(f"    {sid[:12]}... title='{e.get('title','')[:40]}' empty={e.get('isEmpty')}")

    print(f"\n  Model cache: {len(v_mcache)} entries")
    for e in v_mcache:
        print(f"    {e.get('label','')[:40]} status={e.get('status')}")

    print(f"\n  State cache: {len(v_scache)} entries")
    print(f"  Memento sessionId: {v_memento.get('sessionId') if v_memento else 'N/A'}")

    # Verify JSONL file
    with open(JSONL_FILE, "r", encoding="utf-8") as f:
        first_line = f.readline().strip()
    jdata = json.loads(first_line)
    jv = jdata["v"]
    print(f"\n  JSONL: kind={jdata['kind']} version={jv.get('version')} requests={len(jv.get('requests',[]))}")
    print(f"  JSONL size: {os.path.getsize(JSONL_FILE):,} bytes")

    print(f"\n  {'='*60}")
    print(f"  FIX COMPLETE - Restart VS Code to test")
    print(f"  {'='*60}")


if __name__ == "__main__":
    main()