use std::path::Path;
use anyhow::Result;
use rusqlite::Connection;
use serde::Serialize;
use sha2::{Digest, Sha256};
use crate::db;
use crate::paths::state::{StateLayout, TelemetryCostConfig, TelemetryCostModelConfig};
use crate::state::session_gates;
use super::host::{HostContextSnapshot, HostCostUsage};
const TOKENS_PER_MILLION: f64 = 1_000_000.0;
#[derive(Debug, Clone, Serialize)]
pub(crate) struct TelemetryCostView {
pub(crate) status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_estimate_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) focus_item_estimate_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_warn_threshold_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) focus_item_warn_threshold_usd: Option<f64>,
pub(crate) alerts: Vec<String>,
}
impl TelemetryCostView {
pub(crate) fn unavailable(status: &'static str, reason: impl Into<String>) -> Self {
Self {
status,
reason: Some(reason.into()),
model: None,
session_estimate_usd: None,
focus_item_estimate_usd: None,
session_warn_threshold_usd: None,
focus_item_warn_threshold_usd: None,
alerts: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct TelemetryFocusTarget {
pub(crate) next_step_key: String,
pub(crate) title: String,
}
pub(crate) fn continuity_target(
attention_anchor: Option<&session_gates::ExecutionGateAnchor>,
handoff_title: &str,
immediate_actions: &[String],
) -> Option<TelemetryFocusTarget> {
if let Some(text) = attention_anchor.and_then(|anchor| normalized_continuity_text(&anchor.text))
{
return Some(build_continuity_target("execution_gate", text));
}
if let Some(title) = normalized_continuity_text(handoff_title) {
return Some(build_continuity_target("handoff_title", title));
}
immediate_actions
.iter()
.find_map(|item| normalized_continuity_text(item))
.map(|item| build_continuity_target("handoff_immediate_action", item))
}
fn build_continuity_target(kind: &str, title: &str) -> TelemetryFocusTarget {
TelemetryFocusTarget {
next_step_key: continuity_key(kind, title),
title: title.to_owned(),
}
}
fn continuity_key(kind: &str, value: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(kind.as_bytes());
hasher.update([0]);
hasher.update(value.as_bytes());
format!("{kind}:{:x}", hasher.finalize())
}
fn normalized_continuity_text(text: &str) -> Option<&str> {
let trimmed = text.trim();
(!trimmed.is_empty()).then_some(trimmed)
}
pub(crate) fn build_cost_view_for_focus(
layout: &StateLayout,
locality_id: &str,
active_session_id: Option<&str>,
host_snapshot: Option<&HostContextSnapshot>,
focus: Option<&TelemetryFocusTarget>,
) -> Result<TelemetryCostView> {
let Some(host_snapshot) = host_snapshot else {
return Ok(TelemetryCostView::unavailable(
"no_host_telemetry",
"runtime telemetry is unavailable for this session",
));
};
let config = layout.effective_telemetry_cost_config(locality_id)?;
if !config.is_configured() {
return Ok(TelemetryCostView {
status: "unconfigured",
reason: Some(
"no [telemetry.cost] pricing is configured in the active profile or repo overlay"
.to_owned(),
),
model: host_snapshot.model_name.clone(),
session_estimate_usd: None,
focus_item_estimate_usd: None,
session_warn_threshold_usd: None,
focus_item_warn_threshold_usd: None,
alerts: Vec::new(),
});
}
let Some(model_name) = host_snapshot.model_name.as_deref() else {
return Ok(TelemetryCostView {
status: "missing_model",
reason: Some(format!(
"{} telemetry does not expose a model name; set CCD_HOST_MODEL or the runtime-specific model env var to enable cost estimation",
host_snapshot.host
)),
model: None,
session_estimate_usd: None,
focus_item_estimate_usd: None,
session_warn_threshold_usd: normalize_money(config.session_warn_usd),
focus_item_warn_threshold_usd: normalize_money(config.item_warn_usd),
alerts: Vec::new(),
});
};
let Some(rule) = matching_rule(&config, model_name) else {
return Ok(TelemetryCostView {
status: "model_unpriced",
reason: Some(format!(
"no pricing entry in [telemetry.cost] matches model `{model_name}`"
)),
model: Some(model_name.to_owned()),
session_estimate_usd: None,
focus_item_estimate_usd: None,
session_warn_threshold_usd: normalize_money(config.session_warn_usd),
focus_item_warn_threshold_usd: normalize_money(config.item_warn_usd),
alerts: Vec::new(),
});
};
let session_cost = match estimate_session_cost(rule, &host_snapshot.cost_usage) {
Ok(value) => value,
Err(reason) => {
return Ok(TelemetryCostView {
status: "incomplete_pricing",
reason: Some(reason),
model: Some(model_name.to_owned()),
session_estimate_usd: None,
focus_item_estimate_usd: None,
session_warn_threshold_usd: normalize_money(config.session_warn_usd),
focus_item_warn_threshold_usd: normalize_money(config.item_warn_usd),
alerts: Vec::new(),
});
}
};
let focus_item_estimate = if let Some(focus) = focus {
if focus.next_step_key.is_empty() {
None
} else {
Some(
db::telemetry_cost::sum_for_focus(
&layout.state_db_path(),
&focus.next_step_key,
active_session_id,
)? + session_cost,
)
}
} else {
None
};
let session_warn_threshold = normalize_money(config.session_warn_usd);
let focus_item_warn_threshold = normalize_money(config.item_warn_usd);
let mut alerts = Vec::new();
if let Some(threshold) = session_warn_threshold {
if session_cost >= threshold {
alerts.push(format!(
"session cost ${} exceeded the configured session budget ${}",
format_usd(session_cost),
format_usd(threshold)
));
}
}
if let (Some(focus_total), Some(threshold)) = (focus_item_estimate, focus_item_warn_threshold) {
if focus_total >= threshold {
let label = focus
.map(|item| format!("focus item `{}`", item.title))
.unwrap_or_else(|| "focus item".to_owned());
alerts.push(format!(
"{label} cost ${} exceeded the configured item budget ${}",
format_usd(focus_total),
format_usd(threshold)
));
}
}
Ok(TelemetryCostView {
status: "estimated",
reason: None,
model: Some(model_name.to_owned()),
session_estimate_usd: Some(round_money(session_cost)),
focus_item_estimate_usd: focus_item_estimate.map(round_money),
session_warn_threshold_usd: session_warn_threshold,
focus_item_warn_threshold_usd: focus_item_warn_threshold,
alerts,
})
}
pub(crate) fn record_session_cost(
conn: &Connection,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
session_id: &str,
focus: Option<&TelemetryFocusTarget>,
) -> Result<()> {
if session_id.is_empty() {
return Ok(());
}
let Some(host_snapshot) = super::host::current_for_persistence(repo_root)? else {
return Ok(());
};
let config = layout.effective_telemetry_cost_config(locality_id)?;
if !config.is_configured() {
return Ok(());
}
let Some(model_name) = host_snapshot.model_name.clone() else {
return Ok(());
};
let Some(rule) = matching_rule(&config, &model_name) else {
return Ok(());
};
let Ok(session_cost) = estimate_session_cost(rule, &host_snapshot.cost_usage) else {
return Ok(());
};
let (next_step_key, next_step_title) = focus
.map(|focus| (focus.next_step_key.clone(), Some(focus.title.clone())))
.unwrap_or_else(|| (String::new(), None));
db::telemetry_cost::insert(
conn,
&db::telemetry_cost::TelemetryCostRecord {
session_id: session_id.to_owned(),
recorded_at_epoch_s: host_snapshot.observed_at_epoch_s,
next_step_key,
next_step_title,
model: Some(model_name),
session_cost_usd: round_money(session_cost),
input_tokens: host_snapshot.cost_usage.input_tokens,
output_tokens: host_snapshot.cost_usage.output_tokens,
cache_creation_input_tokens: host_snapshot.cost_usage.cache_creation_input_tokens,
cache_read_input_tokens: host_snapshot.cost_usage.cache_read_input_tokens,
blended_total_tokens: host_snapshot.cost_usage.blended_total_tokens,
},
)?;
Ok(())
}
fn matching_rule<'a>(
config: &'a TelemetryCostConfig,
model_name: &str,
) -> Option<&'a TelemetryCostModelConfig> {
config
.models
.iter()
.find(|rule| rule_matches(rule, model_name))
}
fn rule_matches(rule: &TelemetryCostModelConfig, model_name: &str) -> bool {
let normalized_model = normalize_model(model_name);
if normalize_model(&rule.match_name) == normalized_model {
return true;
}
rule.aliases
.iter()
.any(|alias| normalize_model(alias) == normalized_model)
}
fn estimate_session_cost(
rule: &TelemetryCostModelConfig,
usage: &HostCostUsage,
) -> std::result::Result<f64, String> {
validate_rate("input_usd_per_million", rule.input_usd_per_million)?;
validate_rate("output_usd_per_million", rule.output_usd_per_million)?;
validate_rate(
"cache_write_usd_per_million",
rule.cache_write_usd_per_million,
)?;
validate_rate(
"cache_read_usd_per_million",
rule.cache_read_usd_per_million,
)?;
validate_rate("blended_usd_per_million", rule.blended_usd_per_million)?;
if usage.is_empty() {
return Err(
"runtime telemetry did not expose usable token counts for cost estimation".to_owned(),
);
}
let has_detailed_usage = usage.input_tokens != 0
|| usage.output_tokens != 0
|| usage.cache_creation_input_tokens != 0
|| usage.cache_read_input_tokens != 0;
let has_complete_detailed_rates =
has_rate_for_usage(usage.input_tokens, rule.input_usd_per_million)
&& has_rate_for_usage(usage.output_tokens, rule.output_usd_per_million)
&& has_rate_for_usage(
usage.cache_creation_input_tokens,
rule.cache_write_usd_per_million,
)
&& has_rate_for_usage(
usage.cache_read_input_tokens,
rule.cache_read_usd_per_million,
);
if has_detailed_usage && has_complete_detailed_rates {
let cost = cost_component(usage.input_tokens, rule.input_usd_per_million)
+ cost_component(usage.output_tokens, rule.output_usd_per_million)
+ cost_component(
usage.cache_creation_input_tokens,
rule.cache_write_usd_per_million,
)
+ cost_component(
usage.cache_read_input_tokens,
rule.cache_read_usd_per_million,
);
return Ok(cost);
}
if let Some(rate) = rule.blended_usd_per_million {
return Ok((usage.total_tokens() as f64 * rate) / TOKENS_PER_MILLION);
}
Err("pricing for the matched model is incomplete for the available token breakdown; configure per-tier rates or blended_usd_per_million".to_owned())
}
fn has_rate_for_usage(tokens: u64, rate: Option<f64>) -> bool {
tokens == 0 || rate.is_some()
}
fn cost_component(tokens: u64, rate: Option<f64>) -> f64 {
match rate {
Some(rate) if tokens != 0 => (tokens as f64 * rate) / TOKENS_PER_MILLION,
_ => 0.0,
}
}
fn validate_rate(label: &str, rate: Option<f64>) -> std::result::Result<(), String> {
if let Some(rate) = rate {
if !rate.is_finite() || rate < 0.0 {
return Err(format!(
"invalid [telemetry.cost] value for `{label}`; rates must be finite and non-negative"
));
}
}
Ok(())
}
fn normalize_model(value: &str) -> String {
value.trim().to_ascii_lowercase()
}
fn round_money(value: f64) -> f64 {
(value * 1_000_000.0).round() / 1_000_000.0
}
fn normalize_money(value: Option<f64>) -> Option<f64> {
value.map(round_money)
}
pub(crate) fn format_usd(value: f64) -> String {
if value.abs() < 1.0 {
format!("{value:.4}")
} else {
format!("{value:.2}")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::session_gates::{ExecutionGateAnchor, ExecutionGateStatus};
fn pricing_rule() -> TelemetryCostModelConfig {
TelemetryCostModelConfig {
match_name: "gpt-5".to_owned(),
aliases: vec!["gpt-5-2026".to_owned()],
input_usd_per_million: Some(1.0),
output_usd_per_million: Some(2.0),
cache_write_usd_per_million: Some(3.0),
cache_read_usd_per_million: Some(0.5),
blended_usd_per_million: None,
}
}
#[test]
fn estimate_session_cost_uses_detailed_rates() {
let usage = HostCostUsage {
input_tokens: 1_000_000,
output_tokens: 500_000,
cache_creation_input_tokens: 250_000,
cache_read_input_tokens: 2_000_000,
blended_total_tokens: None,
};
let cost = estimate_session_cost(&pricing_rule(), &usage).unwrap();
assert!((cost - 3.75).abs() < 0.000_001);
}
#[test]
fn continuity_target_prefers_execution_gate_anchor() {
let target = continuity_target(
Some(&ExecutionGateAnchor {
index: 1,
text: "Ship the schema migration.".to_owned(),
status: ExecutionGateStatus::Open,
}),
"Next Session: Runtime cleanup",
&[String::from("Refresh the handoff.")],
)
.expect("continuity target");
assert_eq!(target.title, "Ship the schema migration.");
assert!(target.next_step_key.starts_with("execution_gate:"));
}
#[test]
fn continuity_target_falls_back_to_handoff_title() {
let target = continuity_target(
None,
"Next Session: Runtime cleanup",
&[String::from("Refresh the handoff.")],
)
.expect("continuity target");
assert_eq!(target.title, "Next Session: Runtime cleanup");
assert!(target.next_step_key.starts_with("handoff_title:"));
}
#[test]
fn estimate_session_cost_falls_back_to_blended_rate() {
let mut rule = pricing_rule();
rule.input_usd_per_million = None;
rule.output_usd_per_million = None;
rule.cache_write_usd_per_million = None;
rule.cache_read_usd_per_million = None;
rule.blended_usd_per_million = Some(2.0);
let usage = HostCostUsage {
input_tokens: 1_000_000,
output_tokens: 500_000,
cache_creation_input_tokens: 250_000,
cache_read_input_tokens: 250_000,
blended_total_tokens: None,
};
let cost = estimate_session_cost(&rule, &usage).unwrap();
assert!((cost - 4.0).abs() < 0.000_001);
}
#[test]
fn matching_rule_honors_aliases() {
let config = TelemetryCostConfig {
session_warn_usd: None,
item_warn_usd: None,
models: vec![pricing_rule()],
};
assert!(matching_rule(&config, "gpt-5-2026").is_some());
}
}