import difflib
import json
import time
from typing import List, Optional
try:
from opentelemetry import trace
HAS_OTEL = True
tracer = trace.get_tracer(__name__)
except ImportError:
HAS_OTEL = False
tracer = None
from briefcase.validation.extractors import Reference
from briefcase.validation.errors import ValidationError, ValidationErrorCode
class ReferenceResolver:
def __init__(
self,
lakefs_client,
repository: str,
branch: str = "main"
):
self.lakefs = lakefs_client
self.repository = repository
self.branch = branch
self.kb_index = self._load_kb_index()
def _load_kb_index(self) -> dict:
try:
index_content = self.lakefs.read_object(
repository=self.repository,
branch=self.branch,
path=".briefcase/kb_index.json"
)
return json.loads(index_content)
except Exception:
return {}
def resolve_all(
self,
references: List[Reference]
) -> List[ValidationError]:
if HAS_OTEL and tracer:
with tracer.start_as_current_span("validation.resolve_references") as span:
return self._resolve_all_with_telemetry(references, span)
else:
return self._resolve_all_internal(references)
def _resolve_all_with_telemetry(
self,
references: List[Reference],
span
) -> List[ValidationError]:
start_time = time.time()
errors = self._resolve_all_internal(references)
try:
commit_sha = self.lakefs.get_commit(self.repository, self.branch)
span.set_attribute("validation.lakefs.commit", commit_sha)
except Exception:
commit_sha = "unknown"
elapsed_ms = (time.time() - start_time) * 1000
span.set_attribute("validation.resolution.time_ms", elapsed_ms)
span.set_attribute("validation.error.count", len(errors))
for error in errors:
span.add_event(
"validation.error",
attributes={
"error.code": error.code.value,
"error.reference": error.reference
}
)
return errors
def _resolve_all_internal(
self,
references: List[Reference]
) -> List[ValidationError]:
errors = []
try:
commit_sha = self.lakefs.get_commit(self.repository, self.branch)
except Exception as e:
return [
ValidationError(
code=ValidationErrorCode.LAKEFS_UNAVAILABLE,
message=f"Cannot connect to lakeFS: {str(e)}",
reference="",
severity="error",
layer="resolution"
)
]
for ref in references:
error = self._resolve_single(ref, commit_sha)
if error:
errors.append(error)
return errors
def _resolve_single(
self,
ref: Reference,
commit_sha: str
) -> Optional[ValidationError]:
resolved_path = self._resolve_path(ref)
if resolved_path is None:
return ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message=f"Reference '{ref.text}' not found in knowledge base",
reference=ref.text,
severity="error",
layer="resolution",
remediation=self._suggest_similar(ref)
)
try:
exists = self.lakefs.object_exists(
repository=self.repository,
ref=commit_sha,
path=resolved_path
)
if not exists:
return ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message=f"File '{resolved_path}' not found at commit {commit_sha[:8]}",
reference=ref.text,
severity="error",
layer="resolution",
remediation=f"Check if file was moved or deleted"
)
except Exception as e:
return ValidationError(
code=ValidationErrorCode.LAKEFS_UNAVAILABLE,
message=f"Failed to check reference: {str(e)}",
reference=ref.text,
severity="error",
layer="resolution"
)
if ref.version and ref.version != commit_sha:
try:
actual_version = self.lakefs.get_object_commit(
repository=self.repository,
ref=self.branch,
path=resolved_path
)
if actual_version and actual_version != ref.version:
return ValidationError(
code=ValidationErrorCode.VERSION_MISMATCH,
message=f"Version mismatch: prompt references {ref.version[:8]} but current is {actual_version[:8]}",
reference=ref.text,
severity="warning",
layer="resolution",
remediation=f"Update prompt to reference @{actual_version[:8]}"
)
except Exception:
pass
return None
def _resolve_path(self, ref: Reference) -> Optional[str]:
if ref.path and ('/' in ref.path or ref.path.endswith(('.pdf', '.md', '.txt', '.docx'))):
return ref.path
if ref.path in self.kb_index:
return self.kb_index[ref.path]
if ref.path and self.kb_index:
close_matches = difflib.get_close_matches(
ref.path,
self.kb_index.keys(),
n=1,
cutoff=0.6
)
if close_matches:
return self.kb_index[close_matches[0]]
return None
def _suggest_similar(self, ref: Reference) -> str:
if not self.kb_index:
return "Check spelling or verify reference exists in knowledge base"
search_term = ref.path or ref.text
if not search_term:
return "Check spelling or verify reference exists in knowledge base"
close_matches = difflib.get_close_matches(
search_term,
self.kb_index.keys(),
n=3,
cutoff=0.6
)
if close_matches:
suggestions = ", ".join(close_matches)
return f"Did you mean: {suggestions}?"
return "Check spelling or verify reference exists in knowledge base"