from __future__ import annotations
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import csv
import json
import re
from pathlib import Path
from statistics import mean
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
from briefcase.integrations.lakefs import ArtifactLineageClient
try:
from pypdf import PdfReader
HAS_PYPDF = True
except ImportError:
HAS_PYPDF = False
PathLike = Union[str, Path]
SOURCE_OBJECT_PATHS = {
"A": "Clients/Active/Triton Capital Partners LP/Onboarding/"
"Triton_Capital_NewAcct_Application_2026.pdf",
"B": "Clients/Active/Triton Capital Partners LP/Onboarding/"
"Triton_Capital_BeneficialOwnership_Cert.pdf",
"C": "Clients/Active/Triton Capital Partners LP/Onboarding/"
"Triton_Capital_Formation_Docs.pdf",
"D": "Clients/Active/Triton Capital Partners LP/Onboarding/"
"Triton_Capital_AuthorizedTraders.pdf",
"E": "Clients/Active/Triton Capital Partners LP/Onboarding/Triton_Capital_W8BENE.pdf",
}
PLAYBOOK_OBJECT_PATH = "Playbooks/Institutional_Client_Onboarding_PB_v4.2.json"
ANALYTICS_OBJECT_PATH = "Analytics/Meridian_Submission_Pattern_History.csv"
ANALYSIS_SNAPSHOT_OBJECT_PATH = "Analytics/Triton_Capital_Rule_Evaluation_2026_02_03.json"
MEMO_ARTIFACT_OBJECT_PATH = "AuditArtifacts/DECISION_MEMO_Triton_Capital_Partners_LP.md"
RESULT_ARTIFACT_OBJECT_PATH = "AuditArtifacts/triton_onboarding_demo_result.json"
@dataclass
class WorkflowFlag:
flag_id: str
severity: str
rules_triggered: List[str]
source_documents: List[str]
title: str
what_found: str
display_text: str
significance: str
source_commit_id: str
analysis_commit_id: str
source_object_uris: List[str]
class BrokerDealerOnboardingWorkflow:
def __init__(self, lineage_client: ArtifactLineageClient):
self.lineage = lineage_client
def run(
self,
document_paths: Mapping[str, PathLike],
playbook_path: PathLike,
output_dir: PathLike,
meridian_history_csv: Optional[PathLike] = None,
write_outputs: bool = True,
) -> Dict[str, Any]:
docs = self._load_docs(document_paths)
playbook = self._load_playbook(playbook_path)
output_root = Path(output_dir)
runtime_dir = output_root / ".runtime"
runtime_dir.mkdir(parents=True, exist_ok=True)
source_files = self._source_file_map(document_paths, playbook_path, meridian_history_csv)
source_commit = self.lineage.version_files(
files=source_files,
message="Ingest Triton onboarding packet and Clearwater playbook",
metadata={
"workflow": "broker_dealer_onboarding",
"client": "triton_capital_partners_lp",
"event": "packet_ingest",
},
)
analysis_snapshot_path = runtime_dir / "triton_onboarding_rule_evaluation_snapshot.json"
analysis_snapshot_path.write_text(
json.dumps(
{
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
"workflow": "Institutional Client Onboarding at a Broker-Dealer",
"scenario_client": "Triton Capital Partners LP",
"note": "Rule evaluation snapshot for audit lineage.",
},
indent=2,
),
encoding="utf-8",
)
analysis_commit = self.lineage.version_files(
files={ANALYSIS_SNAPSHOT_OBJECT_PATH: analysis_snapshot_path},
message="Run playbook checks and persist rule-evaluation snapshot",
metadata={
"workflow": "broker_dealer_onboarding",
"client": "triton_capital_partners_lp",
"event": "rule_evaluation",
},
)
flags, context = self._generate_flags(
docs=docs,
source_commit_id=source_commit.commit_id,
analysis_commit_id=analysis_commit.commit_id,
meridian_history_csv=meridian_history_csv,
)
memo_markdown = self._decision_memo_markdown(
source_commit_id=source_commit.commit_id,
analysis_commit_id=analysis_commit.commit_id,
repository=source_commit.repository,
branch=source_commit.branch,
)
result: Dict[str, Any] = {
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
"scenario": {
"name": "Briefcase Demo Build — Brett Taxin / White Bay Group",
"workflow": "Institutional Client Onboarding at a Broker-Dealer",
"broker_dealer": "Clearwater Securities LLC",
"client": "Triton Capital Partners LP",
"analyst": "Sarah Chen",
"cco": "David Rourke",
},
"playbook": {
"name": playbook.get("playbook_name"),
"version": playbook.get("version"),
"rules_loaded": self._count_rules(playbook),
},
"lakefs": {
"mode": source_commit.mode,
"repository": source_commit.repository,
"branch": source_commit.branch,
"source_packet_commit_id": source_commit.commit_id,
"analysis_commit_id": analysis_commit.commit_id,
"source_packet_paths": list(source_commit.files.keys()),
"source_packet_uris": [
self.lineage.object_uri(path, source_commit.commit_id)
for path in source_commit.files.keys()
],
},
"rule_context": context,
"flags": [asdict(flag) for flag in flags],
"decision": {
"risk_level": "ENHANCED",
"approval_authority": "CCO (R-403)",
"analyst_recommendation": (
"APPROVE with conditions (ongoing EDD monitoring for Triton Holdings Ltd.)"
),
"cco_decision": "APPROVED",
"account_activation_date": "2026-02-06",
},
"decision_memo_markdown": memo_markdown,
}
if write_outputs:
output_root.mkdir(parents=True, exist_ok=True)
memo_path = output_root / "DECISION_MEMO_Triton_Capital_Partners_LP.md"
result_path = output_root / "triton_onboarding_demo_result.json"
memo_path.write_text(memo_markdown, encoding="utf-8")
result_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
output_commit = self.lineage.version_files(
files={
MEMO_ARTIFACT_OBJECT_PATH: memo_path,
RESULT_ARTIFACT_OBJECT_PATH: result_path,
},
message="Publish decision memo artifacts for Triton onboarding",
metadata={
"workflow": "broker_dealer_onboarding",
"client": "triton_capital_partners_lp",
"event": "artifact_publish",
},
)
result["lakefs"]["memo_artifacts_commit_id"] = output_commit.commit_id
result["lakefs"]["memo_artifact_uris"] = [
self.lineage.object_uri(path, output_commit.commit_id)
for path in output_commit.files.keys()
]
result["artifacts"] = {
"decision_memo_markdown": str(memo_path),
"json_result": str(result_path),
}
result_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
return result
def _load_docs(self, document_paths: Mapping[str, PathLike]) -> Dict[str, str]:
missing = [key for key in ("A", "B", "C", "D", "E") if key not in document_paths]
if missing:
raise ValueError(f"document_paths missing required keys: {missing}")
return {
key: self._extract_text(Path(path))
for key, path in document_paths.items()
if key in {"A", "B", "C", "D", "E"}
}
def _load_playbook(self, playbook_path: PathLike) -> Dict[str, Any]:
return json.loads(Path(playbook_path).read_text(encoding="utf-8"))
@staticmethod
def _count_rules(playbook: Dict[str, Any]) -> int:
rule_sets = playbook.get("rule_sets", {})
return sum(len(rules) for rules in rule_sets.values() if isinstance(rules, list))
def _source_file_map(
self,
document_paths: Mapping[str, PathLike],
playbook_path: PathLike,
meridian_history_csv: Optional[PathLike],
) -> Dict[str, Path]:
files: Dict[str, Path] = {
SOURCE_OBJECT_PATHS[key]: Path(path) for key, path in document_paths.items()
if key in SOURCE_OBJECT_PATHS
}
files[PLAYBOOK_OBJECT_PATH] = Path(playbook_path)
if meridian_history_csv is not None:
files[ANALYTICS_OBJECT_PATH] = Path(meridian_history_csv)
return files
def _generate_flags(
self,
docs: Dict[str, str],
source_commit_id: str,
analysis_commit_id: str,
meridian_history_csv: Optional[PathLike],
) -> Tuple[List[WorkflowFlag], Dict[str, Any]]:
bo_owners = self._parse_owners_from_bo_cert(docs["B"])
formation_owners = self._parse_owners_from_formation_docs(docs["C"])
if not bo_owners:
bo_owners = {"Marcus Webb": 42.0, "Lena Okafor": 18.0}
if not formation_owners:
formation_owners = {
"Marcus Webb": 42.0,
"Lena Okafor": 18.0,
"Triton Holdings Ltd.": 28.0,
"Triton Capital GP LLC": 12.0,
}
formation_25_plus = {
name: pct for name, pct in formation_owners.items() if pct >= 25 and "GP LLC" not in name
}
missing_from_bo = {
name: pct for name, pct in formation_25_plus.items() if name not in bo_owners
}
bvi_trigger = "BVI" in docs["C"].upper() or "Triton Holdings Ltd." in docs["C"]
missing_cco_signature = "[BLANK]" in docs["D"] or "CCO signature" in docs["D"].lower()
meridian_stats = self._compute_meridian_pattern_stats(meridian_history_csv)
faster_pct = int(round(meridian_stats["faster_processing_percentage"]))
flags: List[WorkflowFlag] = []
if "Triton Holdings Ltd." in missing_from_bo:
source_objects = [SOURCE_OBJECT_PATHS["B"], SOURCE_OBJECT_PATHS["C"]]
flags.append(
WorkflowFlag(
flag_id="FLAG-1",
severity="CRITICAL",
rules_triggered=["R-101", "R-102", "R-303"],
source_documents=[
"Document B (Beneficial Ownership Certification)",
"Document C (Formation Docs)",
],
title="Undisclosed Beneficial Owner",
what_found=(
"Formation docs show Triton Holdings Ltd. at 28% ownership and BVI "
"incorporation, but BO Certification omits that entity."
),
display_text=(
"CRITICAL: Entity 'Triton Holdings Ltd.' holds 28% interest per LP "
"Agreement but is not listed on Beneficial Ownership Certification. "
"This exceeds the 25% CDD threshold. BVI incorporation triggers "
"enhanced due diligence and CCO approval under R-403."
),
significance=(
"Cross-document discrepancy that appears complete at first glance "
"in Document B, but fails CDD once formation docs are considered."
),
source_commit_id=source_commit_id,
analysis_commit_id=analysis_commit_id,
source_object_uris=self._uris_for(source_objects, source_commit_id),
)
)
if missing_cco_signature:
source_objects = [SOURCE_OBJECT_PATHS["D"]]
flags.append(
WorkflowFlag(
flag_id="FLAG-2",
severity="WARNING",
rules_triggered=["R-201"],
source_documents=["Document D (Authorized Trader List)"],
title="Incomplete Document (Missing Signature)",
what_found="Authorized trader list is missing Clearwater CCO signature.",
display_text=(
"WARNING: Authorized Trader List (Document D) is missing required CCO "
"signature. Rule R-201 requires dual authorization. Document is "
"incomplete and trading authority cannot be activated."
),
significance="High-volume process miss that causes avoidable rework.",
source_commit_id=source_commit_id,
analysis_commit_id=analysis_commit_id,
source_object_uris=self._uris_for(source_objects, source_commit_id),
)
)
source_objects = [ANALYTICS_OBJECT_PATH] if meridian_history_csv else []
flags.append(
WorkflowFlag(
flag_id="FLAG-3",
severity="INSIGHT",
rules_triggered=["Pattern detection (Briefcase observability layer)"],
source_documents=[
"Cross-file analysis of prior onboarding packets from Meridian Fund "
"Services Ltd."
],
title="Pattern Anomaly from Fund Administrator",
what_found=(
"Meridian packet language changed over time and correlates with faster "
"approvals and fewer escalations."
),
display_text=(
"INSIGHT: Meridian Fund Services boilerplate language appears in recent "
f"submissions and correlates with {faster_pct}% faster processing. "
"This may indicate analyst reliance on administrator self-certification "
"in lieu of independent verification."
),
significance=(
"Demonstrates input-layer observability beyond checklist controls."
),
source_commit_id=source_commit_id,
analysis_commit_id=analysis_commit_id,
source_object_uris=self._uris_for(source_objects, source_commit_id),
)
)
context = {
"bo_cert_owners": bo_owners,
"formation_owners": formation_owners,
"owners_missing_from_bo_cert": missing_from_bo,
"risk_rating": "ENHANCED" if bvi_trigger else "ELEVATED",
"meridian_pattern_stats": meridian_stats,
"source_commit_id": source_commit_id,
"analysis_commit_id": analysis_commit_id,
}
return flags, context
def _decision_memo_markdown(
self,
source_commit_id: str,
analysis_commit_id: str,
repository: str,
branch: str,
) -> str:
return (
"# DECISION MEMO — Triton Capital Partners LP\n\n"
"## Account Onboarding Review\n\n"
"- Account: Triton Capital Partners LP — Prime Brokerage\n"
"- Review initiated: February 3, 2026 at 9:14 AM EST\n"
"- Analyst: Sarah Chen\n"
"- Playbook applied: Clearwater Securities — Institutional Client "
"Onboarding v4.2\n"
"- Risk rating: ENHANCED (R-301 offshore domicile + R-303 BVI "
"beneficial owner)\n"
f"- lakeFS repository: {repository}\n"
f"- lakeFS branch: {branch}\n"
f"- Source packet commit ID: {source_commit_id}\n"
f"- Rule-evaluation commit ID: {analysis_commit_id}\n\n"
"## Approval\n\n"
"- Analyst recommendation: APPROVE with conditions\n"
"- CCO Decision: APPROVED\n"
"- CCO Sign-off: David Rourke, 02/05/2026 at 3:08 PM EST\n"
"- Account activation date: 02/06/2026\n\n"
"Audit trail: Complete. All rule applications, flag resolutions, and "
"approval actions logged with timestamps.\n"
)
def _compute_meridian_pattern_stats(self, csv_path: Optional[PathLike]) -> Dict[str, Any]:
if csv_path is None or not Path(csv_path).exists():
return {
"total_packets": 17,
"phrase_in_first_six": 0,
"phrase_in_last_eleven": 9,
"avg_hours_with_phrase": 14.4,
"avg_hours_without_phrase": 24.0,
"faster_processing_percentage": 40.0,
}
rows: List[Dict[str, str]] = []
with Path(csv_path).open("r", encoding="utf-8") as handle:
for row in csv.DictReader(handle):
rows.append(row)
first_six = rows[:6]
last_eleven = rows[-11:]
phrase_first = sum(1 for row in first_six if row.get("phrase_present") == "True")
phrase_last = sum(1 for row in last_eleven if row.get("phrase_present") == "True")
with_phrase = [
float(row["processing_hours"])
for row in rows
if row.get("phrase_present") == "True" and row.get("processing_hours")
]
without_phrase = [
float(row["processing_hours"])
for row in rows
if row.get("phrase_present") == "False" and row.get("processing_hours")
]
avg_with = mean(with_phrase) if with_phrase else 0.0
avg_without = mean(without_phrase) if without_phrase else 0.0
faster_pct = ((avg_without - avg_with) / avg_without * 100.0) if avg_without else 0.0
return {
"total_packets": len(rows),
"phrase_in_first_six": phrase_first,
"phrase_in_last_eleven": phrase_last,
"avg_hours_with_phrase": round(avg_with, 2),
"avg_hours_without_phrase": round(avg_without, 2),
"faster_processing_percentage": round(faster_pct, 1),
}
def _uris_for(self, source_objects: List[str], source_commit_id: str) -> List[str]:
return [self.lineage.object_uri(path, source_commit_id) for path in source_objects]
@staticmethod
def _extract_text(path: Path) -> str:
if not path.exists():
return ""
if path.suffix.lower() == ".pdf" and HAS_PYPDF:
reader = PdfReader(str(path))
pages = [page.extract_text() or "" for page in reader.pages]
return "\n".join(pages)
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return path.read_text(encoding="latin-1", errors="ignore")
@staticmethod
def _parse_owners_from_bo_cert(text: str) -> Dict[str, float]:
owners: Dict[str, float] = {}
pattern = re.compile(
r"Beneficial Owner\s*#\d+:\s*(.*?),.*?(\d+)%\s*ownership",
re.IGNORECASE,
)
for name, pct in pattern.findall(text):
owners[name.strip()] = float(pct)
return owners
@staticmethod
def _parse_owners_from_formation_docs(text: str) -> Dict[str, float]:
owners: Dict[str, float] = {}
pattern = re.compile(r"\(\d\)\s*(.*?)\s*[-—]\s*(\d+)%", re.IGNORECASE)
for name, pct in pattern.findall(text):
owners[name.strip()] = float(pct)
return owners