use std::collections::BTreeMap;
use datasynth_core::models::{IcPairId, JournalEntry};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use crate::errors::{GroupError, GroupResult};
use crate::manifest::builder::GroupManifest;
use crate::shard::ic_plan::{derive_ic_pair_plans, IcPairPlan, IcRole};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IcMatchResult {
pub matched: Vec<IcMatchedPair>,
pub unmatched: Vec<UnmatchedSide>,
pub total_planned: usize,
pub coverage: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IcMatchedPair {
pub pair_id: IcPairId,
pub seller_entity: String,
pub buyer_entity: String,
pub seller_je: JournalEntry,
pub buyer_je: JournalEntry,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UnmatchedSide {
pub pair_id: IcPairId,
pub present_role: IcRole,
pub present_entity: String,
pub present_je: JournalEntry,
pub reason: UnmatchedReason,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum UnmatchedReason {
MissingBuyerSide,
MissingSellerSide,
AmountDriftAboveTolerance,
}
pub fn match_ic_pairs(
manifest: &GroupManifest,
entity_jes: &[(String, Vec<JournalEntry>)],
) -> GroupResult<IcMatchResult> {
match_ic_pairs_consuming(manifest, entity_jes.to_vec())
}
pub fn match_ic_pairs_consuming(
manifest: &GroupManifest,
entity_jes: Vec<(String, Vec<JournalEntry>)>,
) -> GroupResult<IcMatchResult> {
let mut plan_cache: BTreeMap<String, BTreeMap<IcPairId, IcPairPlan>> = BTreeMap::new();
let mut by_pair: BTreeMap<IcPairId, Vec<ObservedSide>> = BTreeMap::new();
for (entity_code, jes) in entity_jes.into_iter() {
for je in jes.into_iter() {
let Some(pair_id) = je.header.ic_pair_id else {
continue;
};
by_pair.entry(pair_id).or_default().push(ObservedSide {
entity_code: entity_code.clone(),
je,
});
}
}
let mut matched: Vec<IcMatchedPair> = Vec::new();
let mut unmatched: Vec<UnmatchedSide> = Vec::new();
for (pair_id, mut sides) in by_pair.into_iter() {
match sides.len() {
1 => {
let side = sides.pop().expect("len == 1 checked");
let plan = lookup_plan(&mut plan_cache, manifest, &side.entity_code, &pair_id)?;
let reason = match plan.role {
IcRole::Seller => UnmatchedReason::MissingBuyerSide,
IcRole::Buyer => UnmatchedReason::MissingSellerSide,
};
unmatched.push(UnmatchedSide {
pair_id,
present_role: plan.role,
present_entity: side.entity_code,
present_je: side.je,
reason,
});
}
2 => {
let plan_a =
lookup_plan(&mut plan_cache, manifest, &sides[0].entity_code, &pair_id)?;
let plan_b =
lookup_plan(&mut plan_cache, manifest, &sides[1].entity_code, &pair_id)?;
let (seller_idx, buyer_idx) = match (plan_a.role, plan_b.role) {
(IcRole::Seller, IcRole::Buyer) => (0usize, 1usize),
(IcRole::Buyer, IcRole::Seller) => (1usize, 0usize),
(IcRole::Seller, IcRole::Seller) => {
return Err(GroupError::Aggregate(format!(
"match_ic_pairs: pair {} has two seller-side observations \
(entities `{}` and `{}`) — expected one seller + one buyer",
pair_id, sides[0].entity_code, sides[1].entity_code
)));
}
(IcRole::Buyer, IcRole::Buyer) => {
return Err(GroupError::Aggregate(format!(
"match_ic_pairs: pair {} has two buyer-side observations \
(entities `{}` and `{}`) — expected one seller + one buyer",
pair_id, sides[0].entity_code, sides[1].entity_code
)));
}
};
let drift_violation = if matches!(
manifest.matching.strategy,
crate::config::IcMatchingStrategy::EmergentFuzzy
) {
let seller_amount = sides[seller_idx].je.total_debit().abs();
let buyer_amount = sides[buyer_idx].je.total_debit().abs();
let max_amount = seller_amount.max(buyer_amount);
if max_amount > Decimal::ZERO {
let drift = (seller_amount - buyer_amount).abs();
let drift_ratio = drift / max_amount;
drift_ratio > manifest.matching.tolerance_percent
} else {
false
}
} else {
false
};
let (high_idx, low_idx) = if seller_idx > buyer_idx {
(seller_idx, buyer_idx)
} else {
(buyer_idx, seller_idx)
};
let high = sides.swap_remove(high_idx);
let low = sides.swap_remove(low_idx);
let (seller, buyer) = if seller_idx > buyer_idx {
(high, low)
} else {
(low, high)
};
if drift_violation {
unmatched.push(UnmatchedSide {
pair_id,
present_role: IcRole::Seller,
present_entity: seller.entity_code,
present_je: seller.je,
reason: UnmatchedReason::AmountDriftAboveTolerance,
});
unmatched.push(UnmatchedSide {
pair_id,
present_role: IcRole::Buyer,
present_entity: buyer.entity_code,
present_je: buyer.je,
reason: UnmatchedReason::AmountDriftAboveTolerance,
});
continue;
}
matched.push(IcMatchedPair {
pair_id,
seller_entity: seller.entity_code,
buyer_entity: buyer.entity_code,
seller_je: seller.je,
buyer_je: buyer.je,
});
}
n => {
let observed: Vec<String> = sides.iter().map(|s| s.entity_code.clone()).collect();
return Err(GroupError::Aggregate(format!(
"match_ic_pairs: pair {} has {} observed sides ({:?}) — \
expected at most 2 (one seller + one buyer)",
pair_id, n, observed
)));
}
}
}
matched.sort_by_key(|p| p.pair_id);
unmatched.sort_by_key(|p| p.pair_id);
let total_planned = total_planned_pairs(manifest);
let coverage = if total_planned == 0 {
0.0
} else {
matched.len() as f64 / total_planned as f64
};
Ok(IcMatchResult {
matched,
unmatched,
total_planned,
coverage,
})
}
struct ObservedSide {
entity_code: String,
je: JournalEntry,
}
fn total_planned_pairs(manifest: &GroupManifest) -> usize {
manifest
.ownership_graph
.entities
.iter()
.map(|e| {
derive_ic_pair_plans(manifest, &e.code)
.into_iter()
.filter(|p| p.role == IcRole::Seller)
.count()
})
.sum()
}
fn lookup_plan(
cache: &mut BTreeMap<String, BTreeMap<IcPairId, IcPairPlan>>,
manifest: &GroupManifest,
entity_code: &str,
pair_id: &IcPairId,
) -> GroupResult<IcPairPlan> {
if !cache.contains_key(entity_code) {
let plans = derive_ic_pair_plans(manifest, entity_code);
let by_pair: BTreeMap<IcPairId, IcPairPlan> =
plans.into_iter().map(|p| (p.pair_id, p)).collect();
cache.insert(entity_code.to_string(), by_pair);
}
let entity_plans = cache.get(entity_code).expect("just inserted");
entity_plans.get(pair_id).cloned().ok_or_else(|| {
GroupError::Aggregate(format!(
"match_ic_pairs: entity `{}` posted JE for pair {} but the manifest \
derives no plan with that pair_id for that entity — \
stale shard output or manifest mismatch",
entity_code, pair_id
))
})
}