use std::fs;
use chromaframe_sdk::{
AllowedIntervention, CandidateColor, CandidateRanking, CandidateScoreInput, DirectionalDelta,
ExtractionRequest, GoalVector, Lab, MeasurementEngine, MeasurementInput, MeasurementReport,
RegionExtractor, RegionObservation, RegionSet, RegionStatus, VisionReadinessReport,
build_report, capture_quality_report, decode_image, delta_e00, depth_proxy, directional_delta,
harshness, helper::HelperRegionExtractor, ita_degrees, michelson_lightness_contrast,
score_candidate, srgb_to_lab,
};
use crate::{
error::ToolFailure,
schema::{
self, AnalyzeImageInput, AnalyzeImageOutput, CandidateInput, CandidateRankingSummary,
ContrastSummary, EvidenceCardSummary, ExtractionSummary, ImageSummary, LabInput,
ManualRankInput, ManualRankOutput, MeasurementSummary, ReadinessInput, ReadinessOutput,
RegionSummary, SubjectSummary,
},
};
#[derive(Debug, Clone)]
pub struct ChromaFrameRuntime;
impl ChromaFrameRuntime {
#[must_use]
pub const fn from_env() -> Self {
Self
}
pub fn readiness(&self, input: &ReadinessInput) -> Result<ReadinessOutput, ToolFailure> {
let extractor = HelperRegionExtractor::from_env();
let readiness = extractor.readiness_report();
Ok(readiness_output(readiness, input.include_warnings))
}
pub fn rank_candidates(
&self,
input: &ManualRankInput,
) -> Result<ManualRankOutput, ToolFailure> {
validate_manual_rank_input(input)?;
let subject = subject_summary(input.skin_lab.into());
let rankings = rank_lab_candidates(
input.skin_lab.into(),
FeatureLabs {
brow: input.brow_lab.map(Into::into),
iris: input.iris_lab.map(Into::into),
sclera: input.sclera_lab.map(Into::into),
lip: input.lip_lab.map(Into::into),
hair: input.hair_lab.map(Into::into),
beard: input.beard_lab.map(Into::into),
},
input.goal_vector.clone().into(),
input.confidence,
&input.candidates,
input.limit,
)?;
Ok(ManualRankOutput {
status: "complete".to_string(),
subject,
rankings,
uncertainty: format!(
"Manual measurement confidence {:.2}; interpret as deterministic color-fit uncertainty only.",
input.confidence
),
})
}
pub async fn analyze_image(
&self,
input: &AnalyzeImageInput,
) -> Result<AnalyzeImageOutput, ToolFailure> {
validate_analyze_image_input(input)?;
reject_oversized_encoded_image(&input.image_path)?;
let image_bytes =
fs::read(&input.image_path).map_err(|_| ToolFailure::image_read_failed())?;
let normalized = decode_image(&image_bytes, false)
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let quality = capture_quality_report(&normalized)
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let extractor = HelperRegionExtractor::from_env();
let extraction_request = ExtractionRequest::default();
let regions = extractor
.extract_regions(&normalized, &extraction_request)
.await
.map_err(|error| ToolFailure::sdk(error.to_string()))?
.parse()
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let samples = regions
.to_manual_region_samples(&normalized)
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let candidates = input
.candidates
.iter()
.map(|candidate| {
CandidateColor::new(
&candidate.name,
candidate.srgb,
AllowedIntervention::HairColor,
)
})
.collect::<Vec<_>>();
let measurement = MeasurementEngine::measure(&MeasurementInput {
quality,
mode: normalized.measurement_mode,
goal_vector: input.goal_vector.clone().into(),
candidates,
samples,
})
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let sdk_report = build_report(measurement.clone())
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
Ok(AnalyzeImageOutput {
image: ImageSummary {
dimensions: [normalized.width, normalized.height],
measurement_mode: format!("{:?}", normalized.measurement_mode),
icc_status: normalized.icc_status,
metadata_retained: false,
},
extraction: extraction_summary(®ions),
measurement: measurement_summary(&measurement),
rankings: top_rankings(&measurement.rankings, input.limit),
evidence_cards: sdk_report
.evidence_cards
.into_iter()
.map(|card| EvidenceCardSummary {
kind: card.kind,
summary: card.summary,
})
.collect(),
warnings: collect_warnings(®ions),
limitations: vec![
"Hair is not measured without an approved semantic parser asset.".to_string(),
"Facial-hair evidence is conservative: beard/stubble may be approximate, low evidence, or not measured without a semantic classifier.".to_string(),
"Results are deterministic color measurements, rankings, and uncertainty; they are not beauty, attractiveness, race, age, medical, or identity claims.".to_string(),
],
})
}
}
#[derive(Debug, Clone, Copy)]
struct FeatureLabs {
brow: Option<Lab>,
iris: Option<Lab>,
sclera: Option<Lab>,
lip: Option<Lab>,
hair: Option<Lab>,
beard: Option<Lab>,
}
fn validate_manual_rank_input(input: &ManualRankInput) -> Result<(), ToolFailure> {
validate_lab("skin_lab", input.skin_lab.into())?;
validate_optional_lab("brow_lab", input.brow_lab)?;
validate_optional_lab("iris_lab", input.iris_lab)?;
validate_optional_lab("sclera_lab", input.sclera_lab)?;
validate_optional_lab("lip_lab", input.lip_lab)?;
validate_optional_lab("hair_lab", input.hair_lab)?;
validate_optional_lab("beard_lab", input.beard_lab)?;
validate_goal_vector(input.goal_vector.clone().into())?;
validate_confidence(input.confidence)?;
validate_candidates(&input.candidates)?;
schema::validate_limit(input.limit).map_err(ToolFailure::invalid_input)?;
Ok(())
}
fn validate_analyze_image_input(input: &AnalyzeImageInput) -> Result<(), ToolFailure> {
if !input.image_path.is_file() {
return Err(ToolFailure::image_read_failed());
}
validate_goal_vector(input.goal_vector.clone().into())?;
validate_candidates(&input.candidates)?;
schema::validate_limit(input.limit).map_err(ToolFailure::invalid_input)?;
Ok(())
}
fn reject_oversized_encoded_image(path: &std::path::Path) -> Result<(), ToolFailure> {
let metadata = fs::metadata(path).map_err(|_| ToolFailure::image_read_failed())?;
if metadata.len() <= schema::MAX_ENCODED_IMAGE_BYTES {
return Ok(());
}
Err(ToolFailure::invalid_input(format!(
"encoded image exceeds {} byte MCP intake limit",
schema::MAX_ENCODED_IMAGE_BYTES
)))
}
fn validate_goal_vector(goal_vector: GoalVector) -> Result<(), ToolFailure> {
goal_vector
.parse()
.map(|_| ())
.map_err(|error| ToolFailure::invalid_input(error.to_string()))
}
fn validate_candidates(candidates: &[CandidateInput]) -> Result<(), ToolFailure> {
schema::validate_candidate_count(candidates.len()).map_err(ToolFailure::invalid_input)?;
Ok(())
}
fn validate_optional_lab(field: &'static str, lab: Option<LabInput>) -> Result<(), ToolFailure> {
let Some(lab) = lab else {
return Ok(());
};
validate_lab(field, lab.into())
}
fn validate_lab(field: &'static str, lab: Lab) -> Result<(), ToolFailure> {
if lab.l.is_finite() && lab.a.is_finite() && lab.b.is_finite() {
return Ok(());
}
Err(ToolFailure::invalid_input(format!(
"{field} must contain finite Lab values"
)))
}
fn validate_confidence(confidence: f32) -> Result<(), ToolFailure> {
if confidence.is_finite() && (0.0..=1.0).contains(&confidence) {
return Ok(());
}
Err(ToolFailure::invalid_input(
"confidence must be finite and between 0 and 1",
))
}
fn rank_lab_candidates(
skin_lab: Lab,
features: FeatureLabs,
goal_vector: GoalVector,
confidence: f32,
candidates: &[CandidateInput],
limit: usize,
) -> Result<Vec<CandidateRankingSummary>, ToolFailure> {
let baseline_feature_michelson = [
features.brow,
features.iris,
features.sclera,
features.lip,
features.hair,
features.beard,
]
.into_iter()
.flatten()
.map(|lab| michelson_lightness_contrast(skin_lab, lab))
.max_by(f32::total_cmp);
let mut rankings = candidates
.iter()
.enumerate()
.map(|(index, candidate)| {
let candidate_lab = srgb_to_lab(candidate.srgb);
let candidate_harshness = harshness(delta_e00(candidate_lab, skin_lab))
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
let (score, _components) = score_candidate(CandidateScoreInput {
skin_lab,
candidate_lab,
lip_lab: features.lip,
iris_lab: features.iris,
sclera_lab: features.sclera,
hair_lab: features.hair,
brow_lab: features.brow,
baseline_feature_michelson,
goal_vector: goal_vector.clone(),
confidence,
})
.map_err(|error| ToolFailure::sdk(error.to_string()))?;
Ok((
index,
RawRankingSummary {
name: candidate.name.clone(),
score,
confidence,
harshness: candidate_harshness,
label: "Goal-specific fit with measured uncertainty".to_string(),
},
))
})
.collect::<Result<Vec<_>, ToolFailure>>()?;
rankings.sort_by(|left, right| {
right
.1
.score
.total_cmp(&left.1.score)
.then_with(|| left.1.harshness.total_cmp(&right.1.harshness))
.then_with(|| left.0.cmp(&right.0))
});
Ok(rankings
.into_iter()
.map(|(_, ranking)| ranking.into_output())
.take(limit)
.collect())
}
fn readiness_output(readiness: VisionReadinessReport, include_warnings: bool) -> ReadinessOutput {
ReadinessOutput {
sdk_available: true,
vision_helper_available: readiness.backend_available,
python_version: readiness.python_version,
missing_packages: readiness.missing_packages,
missing_models: readiness.missing_models,
warnings: if include_warnings {
readiness.warnings
} else {
Vec::new()
},
}
}
fn extraction_summary(regions: &RegionSet) -> ExtractionSummary {
ExtractionSummary {
backend: regions.extraction_quality.backend.clone(),
faces_detected: regions.extraction_quality.faces_detected,
selected_face_index: regions.extraction_quality.selected_face_index,
regions: regions.regions.iter().map(region_summary).collect(),
}
}
pub(crate) fn region_summary(region: &RegionObservation) -> RegionSummary {
let (status, reason) = match ®ion.status {
RegionStatus::Measured => ("measured".to_string(), None),
RegionStatus::Approximate => ("approximate".to_string(), region.approximate_reason.clone()),
RegionStatus::LowEvidence => (
"low_evidence".to_string(),
region
.approximate_reason
.clone()
.or_else(|| region.not_measured_reason.clone()),
),
RegionStatus::NotMeasured { reason } => ("not_measured".to_string(), Some(reason.clone())),
};
RegionSummary {
kind: format!("{:?}", region.kind),
status,
source: format!("{:?}", region.source),
confidence: round2(region.confidence),
sample_hint: region.sample_hint,
reason: reason.or_else(|| region.not_measured_reason.clone()),
}
}
fn measurement_summary(measurement: &MeasurementReport) -> MeasurementSummary {
MeasurementSummary {
status: format!("{:?}", measurement.status),
confidence: measurement
.rankings
.first()
.map(|ranking| round2(ranking.confidence)),
subject: measurement.subject.as_ref().map(|subject| SubjectSummary {
skin_lab: LabInput::from(subject.skin_lab),
skin_ita: round2(subject.skin_ita),
skin_depth_proxy: round2(subject.skin_depth_proxy),
}),
contrasts: measurement
.contrast_map
.contrasts
.iter()
.map(contrast_summary)
.collect(),
}
}
fn subject_summary(skin_lab: Lab) -> SubjectSummary {
SubjectSummary {
skin_lab: LabInput::from(skin_lab),
skin_ita: round2(ita_degrees(skin_lab)),
skin_depth_proxy: round2(depth_proxy(skin_lab)),
}
}
fn contrast_summary(contrast: &DirectionalDelta) -> ContrastSummary {
ContrastSummary {
from: contrast.from.clone(),
to: contrast.to.clone(),
delta_l: round2(contrast.delta_l),
delta_a: round2(contrast.delta_a),
delta_b: round2(contrast.delta_b),
delta_e00: round2(contrast.delta_e00),
michelson_lightness_contrast: round2(contrast.michelson_lightness_contrast),
}
}
fn top_rankings(rankings: &[CandidateRanking], limit: usize) -> Vec<CandidateRankingSummary> {
rankings
.iter()
.take(limit)
.map(|ranking| CandidateRankingSummary {
name: ranking.name.clone(),
score: round2(ranking.score),
confidence: round2(ranking.confidence),
harshness: round2(ranking.harshness),
label: ranking.label.clone(),
})
.collect()
}
fn collect_warnings(regions: &RegionSet) -> Vec<String> {
let mut warnings = regions.warnings.clone();
warnings.extend(regions.extraction_quality.warnings.clone());
warnings.sort();
warnings.dedup();
warnings
}
fn round2(value: f32) -> f32 {
(value * 100.0).round() / 100.0
}
#[derive(Debug, Clone)]
struct RawRankingSummary {
name: String,
score: f32,
confidence: f32,
harshness: f32,
label: String,
}
impl RawRankingSummary {
fn into_output(self) -> CandidateRankingSummary {
CandidateRankingSummary {
name: self.name,
score: round2(self.score),
confidence: round2(self.confidence),
harshness: round2(self.harshness),
label: self.label,
}
}
}
#[allow(dead_code)]
fn measured_contrasts(skin_lab: Lab, features: FeatureLabs) -> Vec<DirectionalDelta> {
[
("brow", features.brow),
("iris", features.iris),
("sclera", features.sclera),
("lip", features.lip),
("hair", features.hair),
("beard", features.beard),
]
.into_iter()
.filter_map(|(name, lab)| lab.map(|lab| directional_delta("skin", skin_lab, name, lab)))
.collect()
}