use std::path::Path;
use anyhow::Result;
use serde::Serialize;
use serde_json::Value;
use crate::db;
use crate::state::compiled;
#[derive(Clone)]
pub(crate) struct ContextSectionSpec {
pub(crate) name: &'static str,
pub(crate) inclusion_reason: &'static str,
pub(crate) value: Value,
}
#[derive(Clone, Default)]
pub(crate) struct ContextSessionSnapshot {
pub(crate) session_id: Option<String>,
pub(crate) started_at_epoch_s: Option<u64>,
pub(crate) last_started_at_epoch_s: Option<u64>,
pub(crate) start_count: Option<u32>,
}
#[derive(Clone, Serialize)]
pub(crate) struct ContextTelemetryView {
pub(crate) observed_at_epoch_s: u64,
pub(crate) source_fingerprint: String,
pub(crate) normalized_hash: String,
pub(crate) chars: u64,
pub(crate) estimated_tokens: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) previous_normalized_hash: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) previous_source_fingerprint: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) hash_changed_from_previous: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) source_changed_from_previous: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) churn_status: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) boundary_kind: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) host_total_context_chars: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) overhead_ratio: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_started_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_last_started_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_start_count: Option<u32>,
}
#[derive(Clone, Serialize)]
pub(crate) struct ContextDiagnosticsView {
pub(crate) observed_at_epoch_s: u64,
pub(crate) normalized_hash: String,
pub(crate) chars: u64,
pub(crate) estimated_tokens: u64,
pub(crate) sections: Vec<ContextSectionTelemetryView>,
}
#[derive(Clone, Serialize)]
pub(crate) struct ContextSectionTelemetryView {
pub(crate) name: &'static str,
pub(crate) inclusion_reason: &'static str,
pub(crate) normalized_hash: String,
pub(crate) chars: u64,
pub(crate) estimated_tokens: u64,
}
pub(crate) struct ContextObservation {
pub(crate) telemetry: ContextTelemetryView,
pub(crate) diagnostics: ContextDiagnosticsView,
pub(crate) record: db::host_loop::HostLoopEventRecord,
}
#[derive(Clone, Serialize)]
pub(crate) struct HostLoopReportView {
pub(crate) status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) latest_event: Option<HostLoopEventView>,
pub(crate) recent_events: Vec<HostLoopEventView>,
pub(crate) summary: HostLoopSummaryView,
}
#[derive(Clone, Serialize)]
pub(crate) struct HostLoopSummaryView {
pub(crate) event_count: usize,
pub(crate) context_emission_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) p50_chars: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) p95_chars: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) max_chars: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) churn_rate: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) latest_overhead_ratio: Option<f64>,
}
#[derive(Clone, Serialize)]
pub(crate) struct HostLoopEventView {
pub(crate) observed_at_epoch_s: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_id: Option<String>,
pub(crate) host: String,
pub(crate) hook: String,
pub(crate) status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_boundary_action: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) source_fingerprint: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) normalized_hash: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) chars: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) estimated_tokens: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) host_total_context_chars: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) overhead_ratio: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_start_count: Option<u32>,
}
pub(crate) fn estimate_tokens(chars: usize) -> u64 {
u64::try_from(chars.div_ceil(4)).unwrap_or(u64::MAX)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn build_context_observation(
observed_at_epoch_s: u64,
host: &str,
hook: &str,
status: &str,
session_boundary_action: Option<&str>,
source_fingerprint: &str,
context: &Value,
sections: &[ContextSectionSpec],
previous: Option<&db::host_loop::HostLoopEventRecord>,
session: &ContextSessionSnapshot,
host_total_context_chars: Option<u64>,
) -> Result<ContextObservation> {
let normalized_context = normalize_json(context)?;
let normalized_hash = compiled::sha256_string(&normalized_context);
let chars = u64::try_from(normalized_context.chars().count()).unwrap_or(u64::MAX);
let estimated_tokens = estimate_tokens(normalized_context.chars().count());
let section_views = sections
.iter()
.map(|section| {
let normalized = normalize_json(§ion.value)?;
let chars = normalized.chars().count();
Ok(ContextSectionTelemetryView {
name: section.name,
inclusion_reason: section.inclusion_reason,
normalized_hash: compiled::sha256_string(&normalized),
chars: u64::try_from(chars).unwrap_or(u64::MAX),
estimated_tokens: estimate_tokens(chars),
})
})
.collect::<Result<Vec<_>>>()?;
let previous_normalized_hash = previous.and_then(|item| item.normalized_payload_hash.clone());
let previous_source_fingerprint = previous.and_then(|item| item.source_fingerprint.clone());
let hash_changed_from_previous = previous_normalized_hash
.as_deref()
.map(|previous_hash| previous_hash != normalized_hash);
let source_changed_from_previous = previous_source_fingerprint
.as_deref()
.map(|previous_source| previous_source != source_fingerprint);
let churn_status = match (hash_changed_from_previous, source_changed_from_previous) {
(Some(false), _) => Some("stable"),
(Some(true), Some(true)) => Some("expected_change"),
(Some(true), Some(false)) => Some("unexpected_change"),
_ => None,
};
let boundary_kind = boundary_kind(hook, session.start_count);
let overhead_ratio = host_total_context_chars
.filter(|value| *value > 0)
.map(|total_chars| round_ratio((chars as f64) / (total_chars as f64)));
let diagnostics = ContextDiagnosticsView {
observed_at_epoch_s,
normalized_hash: normalized_hash.clone(),
chars,
estimated_tokens,
sections: section_views.clone(),
};
let telemetry = ContextTelemetryView {
observed_at_epoch_s,
source_fingerprint: source_fingerprint.to_owned(),
normalized_hash: normalized_hash.clone(),
chars,
estimated_tokens,
previous_normalized_hash,
previous_source_fingerprint,
hash_changed_from_previous,
source_changed_from_previous,
churn_status,
boundary_kind,
host_total_context_chars,
overhead_ratio,
session_started_at_epoch_s: session.started_at_epoch_s,
session_last_started_at_epoch_s: session.last_started_at_epoch_s,
session_start_count: session.start_count,
};
let record = db::host_loop::HostLoopEventRecord {
_id: None,
observed_at_epoch_s,
session_id: session.session_id.clone(),
host: host.to_owned(),
hook: hook.to_owned(),
status: status.to_owned(),
session_boundary_action: session_boundary_action.map(str::to_owned),
source_fingerprint: Some(source_fingerprint.to_owned()),
normalized_payload_hash: Some(normalized_hash),
payload_chars: Some(chars),
payload_estimated_tokens: Some(estimated_tokens),
host_total_context_chars,
overhead_ratio,
session_started_at_epoch_s: session.started_at_epoch_s,
session_last_started_at_epoch_s: session.last_started_at_epoch_s,
session_start_count: session.start_count,
section_metrics_json: Some(serde_json::to_string(§ion_views)?),
};
Ok(ContextObservation {
telemetry,
diagnostics,
record,
})
}
pub(crate) fn build_report(path: &Path, limit: usize) -> Result<HostLoopReportView> {
let recent = db::host_loop::list_recent(path, limit)?;
if recent.is_empty() {
return Ok(HostLoopReportView {
status: "missing",
reason: Some(
"no host-loop telemetry has been recorded for this workspace yet".to_owned(),
),
latest_event: None,
recent_events: Vec::new(),
summary: HostLoopSummaryView {
event_count: 0,
context_emission_count: 0,
p50_chars: None,
p95_chars: None,
max_chars: None,
churn_rate: None,
latest_overhead_ratio: None,
},
});
}
let recent_events = recent
.iter()
.cloned()
.map(into_event_view)
.collect::<Vec<_>>();
let latest_event = recent_events.first().cloned();
let context_rows = recent
.iter()
.filter(|row| row.payload_chars.is_some() && row.normalized_payload_hash.is_some())
.cloned()
.collect::<Vec<_>>();
let mut chars = context_rows
.iter()
.filter_map(|row| row.payload_chars)
.collect::<Vec<_>>();
chars.sort_unstable();
let churn_rate = compute_churn_rate(&context_rows);
let latest_overhead_ratio = context_rows.iter().find_map(|row| row.overhead_ratio);
Ok(HostLoopReportView {
status: "recorded",
reason: None,
latest_event,
recent_events,
summary: HostLoopSummaryView {
event_count: recent.len(),
context_emission_count: context_rows.len(),
p50_chars: percentile(&chars, 0.50),
p95_chars: percentile(&chars, 0.95),
max_chars: chars.last().copied(),
churn_rate,
latest_overhead_ratio,
},
})
}
pub(crate) fn normalize_json(value: &Value) -> Result<String> {
Ok(serde_json::to_string(&canonicalize_value(value))?)
}
fn into_event_view(record: db::host_loop::HostLoopEventRecord) -> HostLoopEventView {
HostLoopEventView {
observed_at_epoch_s: record.observed_at_epoch_s,
session_id: record.session_id,
host: record.host,
hook: record.hook,
status: record.status,
session_boundary_action: record.session_boundary_action,
source_fingerprint: record.source_fingerprint,
normalized_hash: record.normalized_payload_hash,
chars: record.payload_chars,
estimated_tokens: record.payload_estimated_tokens,
host_total_context_chars: record.host_total_context_chars,
overhead_ratio: record.overhead_ratio.map(round_ratio),
session_start_count: record.session_start_count,
}
}
fn canonicalize_value(value: &Value) -> Value {
match value {
Value::Array(items) => Value::Array(items.iter().map(canonicalize_value).collect()),
Value::Object(map) => {
let mut keys = map.keys().cloned().collect::<Vec<_>>();
keys.sort();
let mut normalized = serde_json::Map::new();
for key in keys {
normalized.insert(key.clone(), canonicalize_value(&map[&key]));
}
Value::Object(normalized)
}
Value::String(text) => Value::String(normalize_text(text)),
_ => value.clone(),
}
}
fn normalize_text(text: &str) -> String {
let mut lines = Vec::new();
let mut previous_blank = false;
for line in text.lines() {
let trimmed = line.trim_end();
let blank = trimmed.is_empty();
if blank && previous_blank {
continue;
}
previous_blank = blank;
lines.push(trimmed);
}
lines.join("\n").trim_end().to_owned()
}
fn boundary_kind(hook: &str, session_start_count: Option<u32>) -> Option<&'static str> {
match hook {
"on_session_start" => match session_start_count {
Some(0 | 1) => Some("fresh_start"),
Some(_) => Some("resume_or_refresh"),
None => Some("startup"),
},
"before_prompt_build" => Some("steady_state"),
_ => None,
}
}
fn percentile(values: &[u64], quantile: f64) -> Option<f64> {
if values.is_empty() {
return None;
}
if values.len() == 1 {
return Some(values[0] as f64);
}
let last_index = values.len() - 1;
let index = ((last_index as f64) * quantile).round() as usize;
Some(values[index.min(last_index)] as f64)
}
fn compute_churn_rate(rows: &[db::host_loop::HostLoopEventRecord]) -> Option<f64> {
if rows.len() < 2 {
return None;
}
let ordered = rows.iter().rev().collect::<Vec<_>>();
let mut numerator = 0u64;
let mut denominator = 0u64;
for pair in ordered.windows(2) {
let previous = pair[0];
let current = pair[1];
let Some(previous_hash) = previous.normalized_payload_hash.as_deref() else {
continue;
};
let Some(current_hash) = current.normalized_payload_hash.as_deref() else {
continue;
};
denominator += 1;
if previous_hash != current_hash
&& previous.source_fingerprint.is_some()
&& previous.source_fingerprint == current.source_fingerprint
{
numerator += 1;
}
}
if denominator == 0 {
None
} else {
Some(round_ratio((numerator as f64) / (denominator as f64)))
}
}
fn round_ratio(value: f64) -> f64 {
(value * 1_000_000.0).round() / 1_000_000.0
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn normalize_json_sorts_keys_and_trims_trailing_whitespace() {
let value = json!({
"b": "line 1 \n\n\nline 2 \n",
"a": {"z": 1, "y": 2}
});
let normalized = normalize_json(&value).unwrap();
assert_eq!(normalized, r#"{"a":{"y":2,"z":1},"b":"line 1\n\nline 2"}"#);
}
#[test]
fn build_context_observation_marks_unexpected_churn_when_source_is_stable() {
let previous = db::host_loop::HostLoopEventRecord {
_id: Some(1),
observed_at_epoch_s: 10,
session_id: Some("ses_1".to_owned()),
host: "codex".to_owned(),
hook: "before_prompt_build".to_owned(),
status: "bounded_context".to_owned(),
session_boundary_action: Some("continue".to_owned()),
source_fingerprint: Some("fp_1".to_owned()),
normalized_payload_hash: Some("old_hash".to_owned()),
payload_chars: Some(100),
payload_estimated_tokens: Some(25),
host_total_context_chars: None,
overhead_ratio: None,
session_started_at_epoch_s: Some(1),
session_last_started_at_epoch_s: Some(2),
session_start_count: Some(2),
section_metrics_json: Some("[]".to_owned()),
};
let observation = build_context_observation(
20,
"codex",
"before_prompt_build",
"bounded_context",
Some("continue"),
"fp_1",
&json!({"task": "trim payload"}),
&[ContextSectionSpec {
name: "task",
inclusion_reason: "always_include_task",
value: json!("trim payload"),
}],
Some(&previous),
&ContextSessionSnapshot {
session_id: Some("ses_1".to_owned()),
started_at_epoch_s: Some(1),
last_started_at_epoch_s: Some(2),
start_count: Some(2),
},
Some(200),
)
.unwrap();
assert_eq!(
observation.telemetry.churn_status,
Some("unexpected_change")
);
assert_eq!(observation.telemetry.overhead_ratio, Some(0.115));
}
}