use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tt_telemetry::audit::{Actor, AuditError, AuditWriter};
use uuid::Uuid;
use crate::types::{PlanResult, ProposedRoute};
#[derive(Debug, Error)]
pub enum ApplyError {
#[error("plan not found")]
NotFound,
#[error("plan is in terminal state '{state}', cannot re-apply")]
InvalidState {
state: String,
},
#[error("store: {0}")]
Store(String),
#[error("audit: {0}")]
Audit(#[from] AuditError),
}
#[async_trait]
pub trait PlanStore: Send + Sync {
async fn mark_applied(
&self,
plan_id: Uuid,
applied_at: DateTime<Utc>,
routes: &[ProposedRoute],
) -> Result<Option<String>, ApplyError>;
}
#[derive(Default)]
pub struct InMemoryPlanStore {
rows: Arc<Mutex<HashMap<Uuid, InMemoryRow>>>,
fail_route_write: bool,
}
#[derive(Debug, Clone)]
struct InMemoryRow {
status: String,
applied_at: Option<DateTime<Utc>>,
applied_routes: Vec<ProposedRoute>,
}
impl InMemoryPlanStore {
pub fn new() -> Self {
Self::default()
}
pub fn with_failing_route_write() -> Self {
Self {
fail_route_write: true,
..Self::default()
}
}
pub fn seed_projected(&self) -> Uuid {
let id = Uuid::now_v7();
let mut g = self.rows.lock().expect("rows lock");
g.insert(
id,
InMemoryRow {
status: "projected".into(),
applied_at: None,
applied_routes: Vec::new(),
},
);
id
}
pub fn status(&self, plan_id: Uuid) -> Option<String> {
let g = self.rows.lock().expect("rows lock");
g.get(&plan_id).map(|r| r.status.clone())
}
pub fn applied_at(&self, plan_id: Uuid) -> Option<DateTime<Utc>> {
let g = self.rows.lock().expect("rows lock");
g.get(&plan_id).and_then(|r| r.applied_at)
}
pub fn applied_routes(&self, plan_id: Uuid) -> Option<Vec<ProposedRoute>> {
let g = self.rows.lock().expect("rows lock");
g.get(&plan_id).map(|r| r.applied_routes.clone())
}
}
#[async_trait]
impl PlanStore for InMemoryPlanStore {
async fn mark_applied(
&self,
plan_id: Uuid,
applied_at: DateTime<Utc>,
routes: &[ProposedRoute],
) -> Result<Option<String>, ApplyError> {
if self.fail_route_write {
return Err(ApplyError::Store("simulated route-write failure".into()));
}
let mut g = self
.rows
.lock()
.map_err(|e| ApplyError::Store(e.to_string()))?;
let Some(row) = g.get_mut(&plan_id) else {
return Ok(None);
};
let prev = row.status.clone();
if prev == "projected" {
row.status = "applied".into();
row.applied_at = Some(applied_at);
row.applied_routes = routes.to_vec();
}
Ok(Some(prev))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ApplyPayload {
plan_id: Uuid,
applied_at: String,
sample_size: u32,
projected_savings_usd: f64,
}
pub async fn apply_plan<S: PlanStore, A: AuditWriter>(
store: &S,
audit_writer: &A,
result: &PlanResult,
actor: Actor,
) -> Result<(), ApplyError> {
let now = Utc::now();
let prev_status = store
.mark_applied(result.plan_id, now, &result.proposed_routes)
.await?;
match prev_status {
None => return Err(ApplyError::NotFound),
Some(s) if s != "projected" => {
return Err(ApplyError::InvalidState { state: s });
}
_ => {}
}
let payload = ApplyPayload {
plan_id: result.plan_id,
applied_at: now.to_rfc3339(),
sample_size: result.sample_size,
projected_savings_usd: result.aggregates.projected_savings_usd,
};
let payload_value = serde_json::to_value(&payload)
.map_err(|e| ApplyError::Store(format!("serialize payload: {e}")))?;
audit_writer
.write(
result.org_id,
actor,
"plan.applied".to_string(),
payload_value,
)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Aggregates, ConfidenceIntervals, PlanResult, RouteAction, RouteConditions};
use chrono::Utc;
use tt_telemetry::audit::{verify_chain, InMemoryAuditWriter};
fn sample_routes() -> Vec<ProposedRoute> {
vec![ProposedRoute {
id: Uuid::now_v7(),
name: "haiku-for-cheap-classification".into(),
priority: 100,
enabled: true,
when: RouteConditions {
model_in: vec!["claude-sonnet-4-5".into()],
input_tokens_lt: Some(2_000),
input_tokens_gt: None,
tag_equals: None,
has_images: None,
has_audio: None,
prompt_contains_any_of: vec![],
estimated_cost_gt: None,
estimated_cost_lt: None,
},
then: RouteAction {
target_model: "claude-haiku-4-5".into(),
fallbacks: Vec::new(),
disable_cache: false,
max_cost_usd: None,
},
}]
}
fn make_plan_result(plan_id: Uuid, org_id: Uuid) -> PlanResult {
make_plan_result_with_routes(plan_id, org_id, sample_routes())
}
fn make_plan_result_with_routes(
plan_id: Uuid,
org_id: Uuid,
proposed_routes: Vec<ProposedRoute>,
) -> PlanResult {
PlanResult {
plan_id,
org_id,
window_start: Utc::now(),
window_end: Utc::now(),
sample_size: 100,
aggregates: Aggregates {
total_baseline_cost_usd: 10.0,
total_projected_cost_usd: 6.0,
projected_savings_usd: 4.0,
projected_savings_pct: 40.0,
cache_hit_rate_projected: 0.0,
p50_latency_ms_projected: 100.0,
p95_latency_ms_projected: 250.0,
requests_rerouted: 50,
requests_unchanged: 50,
requests_unprice_able: 0,
l2_projections: Vec::new(),
l2_poisoning_candidates: 0,
},
confidence_intervals: ConfidenceIntervals {
savings_usd_95: (3.5, 4.5),
savings_pct_95: (35.0, 45.0),
cache_hit_rate_95: (0.0, 0.0),
p50_latency_ms_95: (90.0, 110.0),
p95_latency_ms_95: (200.0, 300.0),
},
per_route_breakdown: Vec::new(),
caveats: Vec::new(),
quality: None,
proposed_routes,
}
}
#[tokio::test]
async fn apply_marks_row_applied_and_emits_audit() {
let store = InMemoryPlanStore::new();
let audit = InMemoryAuditWriter::new();
let plan_id = store.seed_projected();
let org_id = Uuid::now_v7();
let result = make_plan_result(plan_id, org_id);
apply_plan(&store, &audit, &result, Actor::System)
.await
.expect("apply ok");
assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
assert!(store.applied_at(plan_id).is_some());
let written = store.applied_routes(plan_id).expect("row exists");
assert_eq!(written.len(), 1, "the one proposed route must be written");
assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
assert_eq!(written[0].name, "haiku-for-cheap-classification");
let entries = audit.list(org_id).await.expect("list ok");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].event, "plan.applied");
assert!(entries[0]
.payload
.to_string()
.contains(&plan_id.to_string()));
let vk = audit.verifying_key();
verify_chain(&entries, &vk).expect("chain verifies");
}
#[tokio::test]
async fn apply_returns_not_found_for_unknown_plan() {
let store = InMemoryPlanStore::new();
let audit = InMemoryAuditWriter::new();
let result = make_plan_result(Uuid::now_v7(), Uuid::now_v7());
let err = apply_plan(&store, &audit, &result, Actor::System)
.await
.expect_err("unknown plan must fail");
assert!(matches!(err, ApplyError::NotFound));
let entries = audit.list(result.org_id).await.expect("list ok");
assert!(entries.is_empty());
}
#[tokio::test]
async fn apply_twice_returns_invalid_state() {
let store = InMemoryPlanStore::new();
let audit = InMemoryAuditWriter::new();
let plan_id = store.seed_projected();
let org_id = Uuid::now_v7();
let result = make_plan_result(plan_id, org_id);
apply_plan(&store, &audit, &result, Actor::System)
.await
.expect("first apply ok");
let err = apply_plan(&store, &audit, &result, Actor::System)
.await
.expect_err("re-apply must fail");
match err {
ApplyError::InvalidState { state } => assert_eq!(state, "applied"),
other => panic!("expected InvalidState, got {other:?}"),
}
let entries = audit.list(org_id).await.expect("list ok");
assert_eq!(entries.len(), 1);
}
#[tokio::test]
async fn apply_persists_proposed_routes_atomically_with_status() {
let store = InMemoryPlanStore::new();
let audit = InMemoryAuditWriter::new();
let plan_id = store.seed_projected();
let org_id = Uuid::now_v7();
let routes = sample_routes();
let route_id = routes[0].id;
let result = make_plan_result_with_routes(plan_id, org_id, routes);
apply_plan(&store, &audit, &result, Actor::System)
.await
.expect("apply ok");
assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
let written = store.applied_routes(plan_id).expect("row exists");
assert_eq!(written.len(), 1);
assert_eq!(written[0].id, route_id);
assert_eq!(written[0].priority, 100);
assert_eq!(written[0].when.model_in, vec!["claude-sonnet-4-5"]);
assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
let entries = audit.list(org_id).await.expect("list ok");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].event, "plan.applied");
}
#[tokio::test]
async fn route_write_failure_emits_no_audit_row() {
let store = InMemoryPlanStore::with_failing_route_write();
let audit = InMemoryAuditWriter::new();
let plan_id = store.seed_projected();
let org_id = Uuid::now_v7();
let result = make_plan_result(plan_id, org_id);
let err = apply_plan(&store, &audit, &result, Actor::System)
.await
.expect_err("route-write failure must surface");
assert!(matches!(err, ApplyError::Store(_)), "got {err:?}");
assert_eq!(store.status(plan_id).as_deref(), Some("projected"));
assert!(store.applied_at(plan_id).is_none());
assert_eq!(
store.applied_routes(plan_id).expect("row exists").len(),
0,
"no routes may be written when the txn fails"
);
let entries = audit.list(org_id).await.expect("list ok");
assert!(
entries.is_empty(),
"audit row must NOT be emitted on a failed route write"
);
}
#[tokio::test]
async fn apply_with_empty_routes_flips_status_and_writes_none() {
let store = InMemoryPlanStore::new();
let audit = InMemoryAuditWriter::new();
let plan_id = store.seed_projected();
let org_id = Uuid::now_v7();
let result = make_plan_result_with_routes(plan_id, org_id, Vec::new());
apply_plan(&store, &audit, &result, Actor::System)
.await
.expect("apply ok even with no routes");
assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
assert_eq!(store.applied_routes(plan_id).expect("row exists").len(), 0);
let entries = audit.list(org_id).await.expect("list ok");
assert_eq!(entries.len(), 1);
}
}