chasm-cli 1.5.4

Universal chat session manager - harvest, merge, and analyze AI chat history from VS Code, Cursor, and other editors
Documentation
"""Post-restart diagnosis: check if VS Code has overwritten our repairs."""
import json, sqlite3, os, base64
from pathlib import Path

WS = Path(r"C:\Users\adamm\AppData\Roaming\Code\User\workspaceStorage")

# Check a few broken workspaces
TARGETS = {
    "Agentic": ("5ec71800c69c79b96b06a37e38537907", "6be29cba-331e-4aa4-bc58-659cc20f4800"),
    "Cyborg": ("cc60bfebb242bac1578d7b49a44033db", "527646cd-d49e-4b0c-b51b-83abd51f9080"),
    "XWERX_B001": ("05db3446971342e7f6a633a732955575", "6ecda819-b49a-45bd-9b46-dc140c2b30e2"),
}
WORKING = ("chasm", "82cdabb21413f2ff42168423e82c8bdf", "44f0cf62-331e-43c9-b32c-25d91ebab0b8")

def check_workspace(name, h, sid):
    print(f"\n{'='*60}")
    print(f"{name} ({sid[:8]})")
    print(f"{'='*60}")
    
    db = WS / h / "state.vscdb"
    jsonl = WS / h / "chatSessions" / f"{sid}.jsonl"
    
    # 1. Check if WAL exists (VS Code is holding the DB open)
    wal = WS / h / "state.vscdb-wal"
    wal_size = wal.stat().st_size if wal.exists() else 0
    print(f"  WAL: {wal_size}b {'*** ACTIVE WAL ***' if wal_size > 0 else '(none)'}")
    
    # 2. Check index entry
    conn = sqlite3.connect(str(db))
    cur = conn.cursor()
    cur.execute("SELECT value FROM ItemTable WHERE key='chat.ChatSessionStore.index'")
    row = cur.fetchone()
    if row:
        idx = json.loads(row[0])
        entries = idx.get("entries", {})
        entry = entries.get(sid)
        if entry:
            print(f"  Index: hpe={entry.get('hasPendingEdits')} lrs={entry.get('lastResponseState')} empty={entry.get('isEmpty')}")
            print(f"    title: {entry.get('title','?')[:60]}")
            print(f"    isImported: {entry.get('isImported')}")
            print(f"    isExternal: {entry.get('isExternal')}")
            print(f"    initialLocation: {entry.get('initialLocation')}")
            print(f"    timing: {json.dumps(entry.get('timing', {}))}")
        else:
            print(f"  Index: *** SESSION NOT IN INDEX ***")
            print(f"  Available IDs: {list(entries.keys())}")
    else:
        print(f"  Index: *** NO INDEX KEY ***")
    
    # 3. Check WAL content if it exists and is large
    if wal_size > 0:
        print(f"  *** Reading WAL to check if it overrides our data ***")
        # WAL may contain pending writes that haven't been checkpointed
        # We can't easily read WAL directly, but SQLite will use it automatically
        # The connection above should already include WAL data
    
    # 4. Check JSONL file
    if jsonl.exists():
        with open(jsonl, "r", encoding="utf-8") as f:
            content = f.read()
        snap = json.loads(content.strip().split("\n")[0])
        v = snap.get("v", snap)
        print(f"  File: {len(content)}b, kind={snap.get('kind')}")
        print(f"  v.sessionId: {v.get('sessionId')}")
        print(f"  v.version: {v.get('version')}")
        print(f"  v.hasPendingEdits: {v.get('hasPendingEdits')}")
        print(f"  v.pendingRequests: {len(v.get('pendingRequests', []))} items")
        reqs = v.get("requests", [])
        print(f"  v.requests: {len(reqs)}")
        if reqs:
            last = reqs[-1]
            ms = last.get("modelState", {}).get("value", "MISSING")
            print(f"  last_request.modelState.value: {ms}")
            
            # Check response structure
            resp_parts = last.get("response", [])
            print(f"  last_request.response: {len(resp_parts)} parts, type={type(resp_parts).__name__}")
            if resp_parts and isinstance(resp_parts, list) and isinstance(resp_parts[0], dict):
                print(f"    part[0].kind: {resp_parts[0].get('kind')}")
        
        # Check responderUsername
        print(f"  v.responderUsername: {v.get('responderUsername', 'MISSING')}")
        
        # Check customTitle
        print(f"  v.customTitle: {v.get('customTitle', 'MISSING')}")
        
        # Check initialLocation
        print(f"  v.initialLocation: {v.get('initialLocation', 'MISSING')}")
        
        # Check inputState
        is_data = v.get("inputState", {})
        if is_data:
            print(f"  v.inputState keys: {sorted(is_data.keys())}")
        else:
            print(f"  v.inputState: MISSING or empty")
    else:
        print(f"  File: *** NOT FOUND ***")
    
    # 5. Check agentSessions caches for this session
    for cache_key in ["agentSessions.model.cache", "agentSessions.state.cache"]:
        cur.execute("SELECT value FROM ItemTable WHERE key=?", (cache_key,))
        row = cur.fetchone()
        if row:
            cache = json.loads(row[0])
            found = False
            for entry in cache:
                if isinstance(entry, dict):
                    resource = entry.get("resource", "")
                    if "vscode-chat-session://local/" in resource:
                        b64 = resource.split("vscode-chat-session://local/")[1]
                        try:
                            decoded = base64.b64decode(b64).decode("utf-8")
                            if decoded == sid:
                                found = True
                                print(f"  {cache_key}: FOUND (status={entry.get('status','?')}, archived={entry.get('archived','?')})")
                                break
                        except:
                            pass
            if not found:
                print(f"  {cache_key}: NOT FOUND for this session ({len(cache)} total entries)")
    
    conn.close()

# Check working workspace first
check_workspace(*WORKING)

# Check broken workspaces
for name, (h, sid) in TARGETS.items():
    check_workspace(name, h, sid)

# BONUS: Compare full snapshot of working vs broken byte-for-byte, focusing on
# the `v` wrapper structure
print(f"\n{'='*60}")
print("SNAPSHOT FORMAT COMPARISON")
print(f"{'='*60}")

def show_snap_format(name, h, sid):
    jsonl = WS / h / "chatSessions" / f"{sid}.jsonl"
    if not jsonl.exists():
        print(f"\n{name}: FILE NOT FOUND")
        return
    with open(jsonl, "r", encoding="utf-8") as f:
        first_line = f.readline().strip()
    snap = json.loads(first_line)
    
    print(f"\n{name}:")
    print(f"  Top-level keys: {sorted(snap.keys())}")
    print(f"  kind={snap.get('kind')}")
    
    v = snap.get("v")
    if v is None:
        print(f"  *** NO 'v' WRAPPER - data is at top level ***")
        v = snap
    else:
        print(f"  Has 'v' wrapper: yes")
    
    print(f"  v keys: {sorted(v.keys())}")
    print(f"  v.version: {v.get('version')}")
    
    # Check if requests are properly under v
    if "requests" in v:
        print(f"  v.requests: {len(v['requests'])} items")
    elif "requests" in snap and snap != v:
        print(f"  *** requests at TOP level, not under v ***")
    else:
        print(f"  NO requests found anywhere")

show_snap_format("chasm (WORKING)", "82cdabb21413f2ff42168423e82c8bdf", "44f0cf62-331e-43c9-b32c-25d91ebab0b8")

for name, (h, sid) in TARGETS.items():
    show_snap_format(name, h, sid)

print("\n=== DONE ===")