import json
import logging
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_PAGEINDEX_TOOL_KEYWORDS = (
"pageindex",
"page_index",
"pi_search",
"pi_chat",
"pi_retrieve",
)
_PAGEINDEX_RESPONSE_KEYS = ("doc_id", "retrieval_id")
class PageIndexMCPObserver:
def __init__(self) -> None:
self._observed_count: int = 0
self._enriched_count: int = 0
def observe(self, record: Dict[str, Any]) -> bool:
try:
self._observed_count += 1
if not self._is_pageindex_record(record):
return False
self._enrich_record(record)
self._enriched_count += 1
return True
except Exception:
return False
def is_pageindex_mcp_response(self, record: Dict[str, Any]) -> bool:
try:
return self._is_pageindex_record(record)
except Exception:
return False
@property
def observed_count(self) -> int:
return self._observed_count
@property
def enriched_count(self) -> int:
return self._enriched_count
def _is_pageindex_record(self, record: Dict[str, Any]) -> bool:
name = (
record.get("function_name", "")
or record.get("tool_name", "")
or record.get("name", "")
).lower()
if any(kw in name for kw in _PAGEINDEX_TOOL_KEYWORDS):
return True
output_str = _extract_output_str(record)
if not output_str:
return False
parsed = _try_parse_json(output_str)
if not isinstance(parsed, dict):
return False
if any(key in parsed for key in _PAGEINDEX_RESPONSE_KEYS):
return True
if "nodes" in parsed and isinstance(parsed.get("nodes"), list):
return True
if "tree" in parsed and isinstance(parsed.get("tree"), dict):
return True
return False
def _enrich_record(self, record: Dict[str, Any]) -> None:
output_str = _extract_output_str(record)
parsed: Optional[Dict[str, Any]] = None
if output_str:
parsed = _try_parse_json(output_str)
if not isinstance(parsed, dict):
parsed = None
doc_id = _extract_doc_id(parsed, record)
tree_meta = _extract_tree_metadata(parsed)
record["pageindex.doc_id"] = doc_id
record["pageindex.retrieval_method"] = "tree_search"
record["pageindex.tree.depth"] = tree_meta.get("depth", 0)
record["pageindex.tree.nodes_visited"] = tree_meta.get("nodes_visited", 0)
record["pageindex.tree.path"] = tree_meta.get("path", "")
record["pageindex.tree.backtrack_count"] = 0
def _extract_output_str(record: Dict[str, Any]) -> str:
outputs = record.get("outputs") or {}
if not isinstance(outputs, dict):
return ""
return outputs.get("output", "") or outputs.get("content", "") or ""
def _try_parse_json(text: str) -> Optional[Any]:
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError, ValueError):
return None
def _extract_doc_id(
parsed: Optional[Dict[str, Any]], record: Dict[str, Any]
) -> str:
if isinstance(parsed, dict):
doc_id = parsed.get("doc_id", "")
if doc_id:
return str(doc_id)
inputs = record.get("inputs") or {}
if isinstance(inputs, dict):
input_str = inputs.get("input", "") or ""
input_parsed = _try_parse_json(input_str)
if isinstance(input_parsed, dict):
doc_id = input_parsed.get("doc_id", "")
if doc_id:
return str(doc_id)
return ""
def _compute_tree_depth(node: Dict[str, Any], current_depth: int = 0) -> int:
children = node.get("nodes", [])
if not children:
return current_depth
return max(_compute_tree_depth(child, current_depth + 1) for child in children)
def _count_tree_nodes(node: Dict[str, Any]) -> int:
children = node.get("nodes", [])
return 1 + sum(_count_tree_nodes(child) for child in children)
def _build_tree_path(node: Dict[str, Any], max_sections: int = 3) -> str:
parts = []
root_title = node.get("title") or node.get("node_id") or "root"
parts.append(str(root_title))
children = node.get("nodes", [])
for child in children[:max_sections]:
child_title = child.get("title") or child.get("node_id") or "node"
parts.append(str(child_title))
if len(children) > max_sections:
parts.append(f"... ({len(children) - max_sections} more)")
return " > ".join(parts)
def _extract_tree_metadata(parsed: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not isinstance(parsed, dict):
return {}
try:
tree: Optional[Dict[str, Any]] = None
if isinstance(parsed.get("tree"), dict):
tree = parsed["tree"]
elif "nodes" in parsed and isinstance(parsed.get("nodes"), list):
tree = parsed
if tree is None:
return {}
return {
"depth": _compute_tree_depth(tree),
"nodes_visited": _count_tree_nodes(tree),
"path": _build_tree_path(tree),
}
except Exception:
return {}