import sqlite3
import json
import os
import sys
from pathlib import Path
WS_STORAGE = r"C:\Users\adamm\AppData\Roaming\Code\User\workspaceStorage"
BROKEN_HASH = "5ec71800c69c79b96b06a37e38537907"
WORKING_HASH = "82cdabb21413f2ff42168423e82c8bdf"
def read_jsonl_session(path):
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.strip().split('\n')
result = None
for line in lines:
line = line.strip()
if not line:
continue
objects = []
decoder = json.JSONDecoder()
pos = 0
while pos < len(line):
while pos < len(line) and line[pos] in ' \t':
pos += 1
if pos >= len(line):
break
try:
obj, end_pos = decoder.raw_decode(line, pos)
objects.append(obj)
pos = end_pos
except json.JSONDecodeError:
break
for obj in objects:
kind = obj.get('kind', 0)
if kind == 0:
result = obj.get('value', obj)
return result
def check_request_for_start_bug(req, session_id, req_idx):
issues = []
msg = req.get('message')
if msg is None:
issues.append(f" REQ[{req_idx}]: message is None/missing")
elif not isinstance(msg, str):
if isinstance(msg, dict):
parts = msg.get('parts', [])
if not parts:
issues.append(f" REQ[{req_idx}]: message is dict but has no 'parts': keys={list(msg.keys())}")
for pidx, part in enumerate(parts):
if isinstance(part, dict):
rng = part.get('range')
if rng is not None and not isinstance(rng, dict):
issues.append(f" REQ[{req_idx}]: message.parts[{pidx}].range is {type(rng).__name__}, not dict")
if isinstance(rng, dict) and 'start' not in rng:
issues.append(f" REQ[{req_idx}]: message.parts[{pidx}].range has no 'start': keys={list(rng.keys())}")
erng = part.get('editorRange')
if erng is not None and isinstance(erng, dict):
if 'startLineNumber' not in erng and 'start' not in erng:
issues.append(f" REQ[{req_idx}]: message.parts[{pidx}].editorRange has unexpected keys: {list(erng.keys())}")
else:
issues.append(f" REQ[{req_idx}]: message is {type(msg).__name__}, not string or dict")
refs = req.get('contentReferences', [])
if refs:
for ridx, ref in enumerate(refs):
if isinstance(ref, dict):
rng = ref.get('range')
loc = ref.get('location')
uri = ref.get('uri')
if rng is not None:
if isinstance(rng, dict):
if 'start' not in rng and 'startLineNumber' not in rng:
issues.append(f" REQ[{req_idx}]: contentReferences[{ridx}].range missing start: keys={list(rng.keys())}")
else:
issues.append(f" REQ[{req_idx}]: contentReferences[{ridx}].range is {type(rng).__name__}")
if loc is not None:
if isinstance(loc, dict):
loc_range = loc.get('range')
if loc_range is not None and isinstance(loc_range, dict):
if 'start' not in loc_range and 'startLineNumber' not in loc_range:
issues.append(f" REQ[{req_idx}]: contentReferences[{ridx}].location.range missing start: keys={list(loc_range.keys())}")
kind = ref.get('kind')
value = ref.get('value')
if value is not None and isinstance(value, dict):
v_range = value.get('range')
v_loc = value.get('location')
if v_range is not None and isinstance(v_range, dict):
if 'start' not in v_range and 'startLineNumber' not in v_range:
issues.append(f" REQ[{req_idx}]: contentReferences[{ridx}].value.range missing start")
if v_loc is not None and isinstance(v_loc, dict):
vl_range = v_loc.get('range')
if vl_range is not None and isinstance(vl_range, dict):
if 'start' not in vl_range and 'startLineNumber' not in vl_range:
issues.append(f" REQ[{req_idx}]: contentReferences[{ridx}].value.location.range missing start")
resp = req.get('response', [])
if isinstance(resp, list):
for pidx, part in enumerate(resp):
if isinstance(part, dict):
rng = part.get('range')
if rng is not None and isinstance(rng, dict):
if 'start' not in rng and 'startLineNumber' not in rng:
issues.append(f" REQ[{req_idx}]: response[{pidx}].range missing start")
uri = part.get('uri')
if uri is not None:
if isinstance(uri, dict):
scheme = uri.get('scheme', '')
if not scheme or not scheme.isalpha():
issues.append(f" REQ[{req_idx}]: response[{pidx}].uri has bad scheme: '{scheme}'")
elif isinstance(uri, str):
if ':' in uri:
scheme = uri.split(':')[0]
import re
if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*$', scheme):
issues.append(f" REQ[{req_idx}]: response[{pidx}].uri has bad scheme: '{scheme}' in '{uri[:80]}'")
value = part.get('value')
if isinstance(value, dict):
v_uri = value.get('uri')
if v_uri is not None and isinstance(v_uri, dict):
scheme = v_uri.get('scheme', '')
if not isinstance(scheme, str) or (scheme and not scheme[0].isalpha()):
issues.append(f" REQ[{req_idx}]: response[{pidx}].value.uri bad scheme: '{scheme}'")
elif resp is not None and not isinstance(resp, list):
issues.append(f" REQ[{req_idx}]: response is {type(resp).__name__}, not list")
vd = req.get('variableData')
if vd is not None and isinstance(vd, dict):
variables = vd.get('variables', [])
for vidx, v in enumerate(variables):
if isinstance(v, dict):
rng = v.get('range')
if rng is not None and isinstance(rng, dict):
if 'start' not in rng and 'startLineNumber' not in rng:
issues.append(f" REQ[{req_idx}]: variableData.variables[{vidx}].range missing start")
uc = req.get('usedContext')
if uc is not None and isinstance(uc, dict):
rng = uc.get('range')
if rng is not None and isinstance(rng, dict):
if 'start' not in rng and 'startLineNumber' not in rng:
issues.append(f" REQ[{req_idx}]: usedContext.range missing start")
efe = req.get('editedFileEvents', [])
if efe:
for eidx, e in enumerate(efe):
if isinstance(e, dict):
uri = e.get('uri')
if uri is not None and isinstance(uri, dict):
scheme = uri.get('scheme', '')
if not isinstance(scheme, str) or (scheme and not scheme[0].isalpha()):
issues.append(f" REQ[{req_idx}]: editedFileEvents[{eidx}].uri bad scheme: '{scheme}'")
return issues
def deep_search_for_start(obj, path=""):
issues = []
if isinstance(obj, dict):
if 'end' in obj and 'start' not in obj:
issues.append(f" {path}: has 'end' but missing 'start': keys={list(obj.keys())}")
if 'range' in obj and obj['range'] is None:
issues.append(f" {path}.range: is null (should be object with start/end)")
if 'scheme' in obj:
scheme = obj.get('scheme', '')
if isinstance(scheme, str) and scheme:
import re
if not re.match(r'^[a-zA-Z][a-zA-Z0-9+.\-]*$', scheme):
issues.append(f" {path}.scheme: illegal characters: '{scheme}'")
for k, v in obj.items():
issues.extend(deep_search_for_start(v, f"{path}.{k}"))
elif isinstance(obj, list):
for i, item in enumerate(obj):
issues.extend(deep_search_for_start(item, f"{path}[{i}]"))
return issues
def check_db_values(ws_hash, label):
db_path = os.path.join(WS_STORAGE, ws_hash, "state.vscdb")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE '%chat%' OR key LIKE '%session%' OR key LIKE '%interactive%' OR key LIKE '%agent%'")
rows = cursor.fetchall()
print(f"\n{'='*80}")
print(f"DB VALUE CHECK: {label} ({ws_hash})")
print(f"{'='*80}")
for key, value in rows:
if value is None:
continue
try:
data = json.loads(value)
except (json.JSONDecodeError, TypeError):
continue
issues = deep_search_for_start(data, key)
if issues:
print(f"\n KEY: {key}")
for issue in issues:
print(issue)
conn.close()
def check_sessions(ws_hash, label):
sessions_dir = os.path.join(WS_STORAGE, ws_hash, "chatSessions")
if not os.path.isdir(sessions_dir):
print(f"\n No chatSessions dir for {label}")
return
print(f"\n{'='*80}")
print(f"SESSION JSONL CHECK: {label} ({ws_hash})")
print(f"{'='*80}")
for fname in os.listdir(sessions_dir):
if not fname.endswith('.jsonl'):
continue
session_id = fname.replace('.jsonl', '')
fpath = os.path.join(sessions_dir, fname)
try:
session = read_jsonl_session(fpath)
except Exception as e:
print(f"\n SESSION {session_id}: ERROR reading: {e}")
continue
if session is None:
print(f"\n SESSION {session_id}: No data parsed")
continue
requests = session.get('requests', [])
all_issues = []
for idx, req in enumerate(requests):
issues = check_request_for_start_bug(req, session_id, idx)
all_issues.extend(issues)
deep_issues = deep_search_for_start(session, f"session({session_id[:8]})")
if all_issues or deep_issues:
print(f"\n SESSION {session_id}:")
print(f" File size: {os.path.getsize(fpath)} bytes")
print(f" Requests: {len(requests)}")
if all_issues:
print(f" Request issues ({len(all_issues)}):")
for issue in all_issues[:20]:
print(f" {issue}")
if len(all_issues) > 20:
print(f" ... and {len(all_issues) - 20} more")
if deep_issues:
print(f" Deep search issues ({len(deep_issues)}):")
for issue in deep_issues[:20]:
print(f" {issue}")
if len(deep_issues) > 20:
print(f" ... and {len(deep_issues) - 20} more")
else:
print(f"\n SESSION {session_id}: OK ({len(requests)} requests)")
print("Checking BROKEN workspace (Agentic)...")
check_db_values(BROKEN_HASH, "Agentic (BROKEN)")
check_sessions(BROKEN_HASH, "Agentic (BROKEN)")
print("\n\nChecking WORKING workspace (chasm) for comparison...")
check_db_values(WORKING_HASH, "chasm (WORKING)")
check_sessions(WORKING_HASH, "chasm (WORKING)")