chasm-cli 1.5.4

Universal chat session manager - harvest, merge, and analyze AI chat history from VS Code, Cursor, and other editors
Documentation
"""
Find the exact data causing 'Cannot read properties of undefined (reading 'start')' 
in VS Code's chat data parser.

Based on VS Code source analysis:
- O(e) iterates requests and calls P(i) for each
- P(e) calls LAs(e.message) when message is not a string
- P(e) processes contentReferences via Oo(l)
- P(e) creates response from e.response array

We need to find requests where:
1. message is not a string (could cause LAs to fail on bad structure)
2. contentReferences contain objects with bad range (missing .start)
3. response parts have bad range/position data
4. Any nested object structure that expects .start but has undefined parent
"""

import sqlite3
import json
import os
import sys
from pathlib import Path

WS_STORAGE = r"C:\Users\adamm\AppData\Roaming\Code\User\workspaceStorage"

# Broken workspace
BROKEN_HASH = "5ec71800c69c79b96b06a37e38537907"
# Working workspace for comparison
WORKING_HASH = "82cdabb21413f2ff42168423e82c8bdf"

def read_jsonl_session(path):
    """Read JSONL and return the final state."""
    with open(path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    lines = content.strip().split('\n')
    result = None
    
    for line in lines:
        line = line.strip()
        if not line:
            continue
        
        # Handle concatenated JSON on single line
        objects = []
        decoder = json.JSONDecoder()
        pos = 0
        while pos < len(line):
            while pos < len(line) and line[pos] in ' \t':
                pos += 1
            if pos >= len(line):
                break
            try:
                obj, end_pos = decoder.raw_decode(line, pos)
                objects.append(obj)
                pos = end_pos
            except json.JSONDecodeError:
                break
        
        for obj in objects:
            kind = obj.get('kind', 0)
            if kind == 0:
                result = obj.get('value', obj)
            # Skip delta/splice for now - we care about compacted data
    
    return result

def check_request_for_start_bug(req, session_id, req_idx):
    """Check a single request object for patterns that would cause .start access on undefined."""
    issues = []
    
    # 1. Check message field
    msg = req.get('message')
    if msg is None:
        issues.append(f"  REQ[{req_idx}]: message is None/missing")
    elif not isinstance(msg, str):
        if isinstance(msg, dict):
            # Structured message - check if it has expected fields
            parts = msg.get('parts', [])
            if not parts:
                issues.append(f"  REQ[{req_idx}]: message is dict but has no 'parts': keys={list(msg.keys())}")
            for pidx, part in enumerate(parts):
                if isinstance(part, dict):
                    # Check for range in message parts
                    rng = part.get('range')
                    if rng is not None and not isinstance(rng, dict):
                        issues.append(f"  REQ[{req_idx}]: message.parts[{pidx}].range is {type(rng).__name__}, not dict")
                    if isinstance(rng, dict) and 'start' not in rng:
                        issues.append(f"  REQ[{req_idx}]: message.parts[{pidx}].range has no 'start': keys={list(rng.keys())}")
                    # Check for editorRange
                    erng = part.get('editorRange')
                    if erng is not None and isinstance(erng, dict):
                        if 'startLineNumber' not in erng and 'start' not in erng:
                            issues.append(f"  REQ[{req_idx}]: message.parts[{pidx}].editorRange has unexpected keys: {list(erng.keys())}")
        else:
            issues.append(f"  REQ[{req_idx}]: message is {type(msg).__name__}, not string or dict")
    
    # 2. Check contentReferences
    refs = req.get('contentReferences', [])
    if refs:
        for ridx, ref in enumerate(refs):
            if isinstance(ref, dict):
                # Look for range/location objects
                rng = ref.get('range')
                loc = ref.get('location')
                uri = ref.get('uri')
                
                if rng is not None:
                    if isinstance(rng, dict):
                        if 'start' not in rng and 'startLineNumber' not in rng:
                            issues.append(f"  REQ[{req_idx}]: contentReferences[{ridx}].range missing start: keys={list(rng.keys())}")
                    else:
                        issues.append(f"  REQ[{req_idx}]: contentReferences[{ridx}].range is {type(rng).__name__}")
                
                if loc is not None:
                    if isinstance(loc, dict):
                        loc_range = loc.get('range')
                        if loc_range is not None and isinstance(loc_range, dict):
                            if 'start' not in loc_range and 'startLineNumber' not in loc_range:
                                issues.append(f"  REQ[{req_idx}]: contentReferences[{ridx}].location.range missing start: keys={list(loc_range.keys())}")
                
                # Check for nested kind/value structure
                kind = ref.get('kind')
                value = ref.get('value')
                if value is not None and isinstance(value, dict):
                    v_range = value.get('range')
                    v_loc = value.get('location')
                    if v_range is not None and isinstance(v_range, dict):
                        if 'start' not in v_range and 'startLineNumber' not in v_range:
                            issues.append(f"  REQ[{req_idx}]: contentReferences[{ridx}].value.range missing start")
                    if v_loc is not None and isinstance(v_loc, dict):
                        vl_range = v_loc.get('range')
                        if vl_range is not None and isinstance(vl_range, dict):
                            if 'start' not in vl_range and 'startLineNumber' not in vl_range:
                                issues.append(f"  REQ[{req_idx}]: contentReferences[{ridx}].value.location.range missing start")
    
    # 3. Check response parts
    resp = req.get('response', [])
    if isinstance(resp, list):
        for pidx, part in enumerate(resp):
            if isinstance(part, dict):
                # Check for range in response parts
                rng = part.get('range')
                if rng is not None and isinstance(rng, dict):
                    if 'start' not in rng and 'startLineNumber' not in rng:
                        issues.append(f"  REQ[{req_idx}]: response[{pidx}].range missing start")
                
                # Check for uri in response parts
                uri = part.get('uri')
                if uri is not None:
                    if isinstance(uri, dict):
                        scheme = uri.get('scheme', '')
                        if not scheme or not scheme.isalpha():
                            issues.append(f"  REQ[{req_idx}]: response[{pidx}].uri has bad scheme: '{scheme}'")
                    elif isinstance(uri, str):
                        # Check for URI scheme issues
                        if ':' in uri:
                            scheme = uri.split(':')[0]
                            import re
                            if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*$', scheme):
                                issues.append(f"  REQ[{req_idx}]: response[{pidx}].uri has bad scheme: '{scheme}' in '{uri[:80]}'")
                
                # Check value.uri for nested structures
                value = part.get('value')
                if isinstance(value, dict):
                    v_uri = value.get('uri')
                    if v_uri is not None and isinstance(v_uri, dict):
                        scheme = v_uri.get('scheme', '')
                        if not isinstance(scheme, str) or (scheme and not scheme[0].isalpha()):
                            issues.append(f"  REQ[{req_idx}]: response[{pidx}].value.uri bad scheme: '{scheme}'")
    elif resp is not None and not isinstance(resp, list):
        issues.append(f"  REQ[{req_idx}]: response is {type(resp).__name__}, not list")
    
    # 4. Check variableData
    vd = req.get('variableData')
    if vd is not None and isinstance(vd, dict):
        variables = vd.get('variables', [])
        for vidx, v in enumerate(variables):
            if isinstance(v, dict):
                rng = v.get('range')
                if rng is not None and isinstance(rng, dict):
                    if 'start' not in rng and 'startLineNumber' not in rng:
                        issues.append(f"  REQ[{req_idx}]: variableData.variables[{vidx}].range missing start")
    
    # 5. Check usedContext
    uc = req.get('usedContext')
    if uc is not None and isinstance(uc, dict):
        rng = uc.get('range')
        if rng is not None and isinstance(rng, dict):
            if 'start' not in rng and 'startLineNumber' not in rng:
                issues.append(f"  REQ[{req_idx}]: usedContext.range missing start")
    
    # 6. Check editedFileEvents
    efe = req.get('editedFileEvents', [])
    if efe:
        for eidx, e in enumerate(efe):
            if isinstance(e, dict):
                uri = e.get('uri')
                if uri is not None and isinstance(uri, dict):
                    scheme = uri.get('scheme', '')
                    if not isinstance(scheme, str) or (scheme and not scheme[0].isalpha()):
                        issues.append(f"  REQ[{req_idx}]: editedFileEvents[{eidx}].uri bad scheme: '{scheme}'")
    
    return issues

def deep_search_for_start(obj, path=""):
    """Recursively search for any object that has a field where .start would be accessed."""
    issues = []
    
    if isinstance(obj, dict):
        # Check if this dict should have 'start' but doesn't
        # VS Code Range: {start: Position, end: Position}
        if 'end' in obj and 'start' not in obj:
            issues.append(f"  {path}: has 'end' but missing 'start': keys={list(obj.keys())}")
        
        # Check for objects with 'line'/'character' (Position) that might be expected as .start
        # Check for range-like structures
        if 'range' in obj and obj['range'] is None:
            issues.append(f"  {path}.range: is null (should be object with start/end)")
        
        # Check URIs
        if 'scheme' in obj:
            scheme = obj.get('scheme', '')
            if isinstance(scheme, str) and scheme:
                import re
                if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*$', scheme):
                    issues.append(f"  {path}.scheme: illegal characters: '{scheme}'")
        
        for k, v in obj.items():
            issues.extend(deep_search_for_start(v, f"{path}.{k}"))
    
    elif isinstance(obj, list):
        for i, item in enumerate(obj):
            issues.extend(deep_search_for_start(item, f"{path}[{i}]"))
    
    return issues

def check_db_values(ws_hash, label):
    """Check all chat-related DB values."""
    db_path = os.path.join(WS_STORAGE, ws_hash, "state.vscdb")
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE '%chat%' OR key LIKE '%session%' OR key LIKE '%interactive%' OR key LIKE '%agent%'")
    rows = cursor.fetchall()
    
    print(f"\n{'='*80}")
    print(f"DB VALUE CHECK: {label} ({ws_hash})")
    print(f"{'='*80}")
    
    for key, value in rows:
        if value is None:
            continue
        try:
            data = json.loads(value)
        except (json.JSONDecodeError, TypeError):
            continue
        
        issues = deep_search_for_start(data, key)
        if issues:
            print(f"\n  KEY: {key}")
            for issue in issues:
                print(issue)
    
    conn.close()

def check_sessions(ws_hash, label):
    """Check all session JSONL files."""
    sessions_dir = os.path.join(WS_STORAGE, ws_hash, "chatSessions")
    if not os.path.isdir(sessions_dir):
        print(f"\n  No chatSessions dir for {label}")
        return
    
    print(f"\n{'='*80}")
    print(f"SESSION JSONL CHECK: {label} ({ws_hash})")
    print(f"{'='*80}")
    
    for fname in os.listdir(sessions_dir):
        if not fname.endswith('.jsonl'):
            continue
        session_id = fname.replace('.jsonl', '')
        fpath = os.path.join(sessions_dir, fname)
        
        try:
            session = read_jsonl_session(fpath)
        except Exception as e:
            print(f"\n  SESSION {session_id}: ERROR reading: {e}")
            continue
        
        if session is None:
            print(f"\n  SESSION {session_id}: No data parsed")
            continue
        
        requests = session.get('requests', [])
        all_issues = []
        
        for idx, req in enumerate(requests):
            issues = check_request_for_start_bug(req, session_id, idx)
            all_issues.extend(issues)
        
        # Deep search the entire session
        deep_issues = deep_search_for_start(session, f"session({session_id[:8]})")
        
        if all_issues or deep_issues:
            print(f"\n  SESSION {session_id}:")
            print(f"    File size: {os.path.getsize(fpath)} bytes")
            print(f"    Requests: {len(requests)}")
            if all_issues:
                print(f"    Request issues ({len(all_issues)}):")
                for issue in all_issues[:20]:
                    print(f"      {issue}")
                if len(all_issues) > 20:
                    print(f"      ... and {len(all_issues) - 20} more")
            if deep_issues:
                print(f"    Deep search issues ({len(deep_issues)}):")
                for issue in deep_issues[:20]:
                    print(f"      {issue}")
                if len(deep_issues) > 20:
                    print(f"      ... and {len(deep_issues) - 20} more")
        else:
            print(f"\n  SESSION {session_id}: OK ({len(requests)} requests)")

# Run checks
print("Checking BROKEN workspace (Agentic)...")
check_db_values(BROKEN_HASH, "Agentic (BROKEN)")
check_sessions(BROKEN_HASH, "Agentic (BROKEN)")

print("\n\nChecking WORKING workspace (chasm) for comparison...")
check_db_values(WORKING_HASH, "chasm (WORKING)")
check_sessions(WORKING_HASH, "chasm (WORKING)")