import asyncio
import functools
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from briefcase.integrations.frameworks._export_mixin import ExportMixin
class _RecordExporter(ExportMixin):
def __init__(self, exporter: Any, async_capture: bool) -> None:
self._exporter = exporter
self.async_capture = async_capture
def capture(
fn=None,
*,
decision_type: Optional[str] = None,
context_version: Optional[str] = None,
max_input_chars: int = 1000,
max_output_chars: int = 1000,
exporter: Any = None,
async_capture: bool = True,
):
if fn is not None:
return _make_wrapper(
fn,
decision_type=None,
context_version=None,
max_input_chars=1000,
max_output_chars=1000,
exporter=None,
async_capture=True,
)
def decorator(func):
return _make_wrapper(
func,
decision_type=decision_type,
context_version=context_version,
max_input_chars=max_input_chars,
max_output_chars=max_output_chars,
exporter=exporter,
async_capture=async_capture,
)
return decorator
def _make_wrapper(func, *, decision_type, context_version, max_input_chars,
max_output_chars, exporter, async_capture):
_dt = decision_type or func.__qualname__
_exporter_obj = _RecordExporter(exporter=exporter, async_capture=async_capture)
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
record = _build_record(_dt, context_version, args, kwargs, max_input_chars,
func.__name__)
started_at = datetime.now(timezone.utc)
try:
result = await func(*args, **kwargs)
_finalize_record(record, started_at, result=result,
max_output_chars=max_output_chars)
_exporter_obj._trigger_export(record)
return result
except Exception as exc:
_finalize_record(record, started_at, error=exc,
max_output_chars=max_output_chars)
_exporter_obj._trigger_export(record)
raise
return async_wrapper
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
record = _build_record(_dt, context_version, args, kwargs, max_input_chars,
func.__name__)
started_at = datetime.now(timezone.utc)
try:
result = func(*args, **kwargs)
_finalize_record(record, started_at, result=result,
max_output_chars=max_output_chars)
_exporter_obj._trigger_export(record)
return result
except Exception as exc:
_finalize_record(record, started_at, error=exc,
max_output_chars=max_output_chars)
_exporter_obj._trigger_export(record)
raise
return sync_wrapper
def _build_record(
decision_type: str,
context_version: Optional[str],
args: tuple,
kwargs: dict,
max_input_chars: int,
function_name: str,
) -> dict:
inputs: dict = {}
if args:
inputs["args"] = repr(args)[:max_input_chars]
if kwargs:
inputs["kwargs"] = repr(kwargs)[:max_input_chars]
record = {
"decision_id": str(uuid.uuid4()),
"decision_type": decision_type,
"function_name": function_name,
"inputs": inputs,
"outputs": {},
"started_at": datetime.now(timezone.utc).isoformat(),
}
if context_version is not None:
record["context_version"] = context_version
return record
def _finalize_record(
record: dict,
started_at: datetime,
*,
result: Any = None,
error: Optional[Exception] = None,
max_output_chars: int = 1000,
) -> None:
ended_at = datetime.now(timezone.utc)
record["ended_at"] = ended_at.isoformat()
record["execution_time_ms"] = (ended_at - started_at).total_seconds() * 1000
if error is not None:
record["error"] = str(error)
else:
record["outputs"] = {"result": repr(result)[:max_output_chars]}