1use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tt_telemetry::audit::{Actor, AuditError, AuditWriter};
19use uuid::Uuid;
20
21use crate::types::{PlanResult, ProposedRoute};
22
23#[derive(Debug, Error)]
25pub enum ApplyError {
26 #[error("plan not found")]
29 NotFound,
30 #[error("plan is in terminal state '{state}', cannot re-apply")]
33 InvalidState {
34 state: String,
36 },
37 #[error("store: {0}")]
41 Store(String),
42 #[error("audit: {0}")]
46 Audit(#[from] AuditError),
47}
48
49#[async_trait]
53pub trait PlanStore: Send + Sync {
54 async fn mark_applied(
78 &self,
79 plan_id: Uuid,
80 applied_at: DateTime<Utc>,
81 routes: &[ProposedRoute],
82 ) -> Result<Option<String>, ApplyError>;
83}
84
85#[derive(Default)]
87pub struct InMemoryPlanStore {
88 rows: Arc<Mutex<HashMap<Uuid, InMemoryRow>>>,
89 fail_route_write: bool,
95}
96
97#[derive(Debug, Clone)]
98struct InMemoryRow {
99 status: String,
100 applied_at: Option<DateTime<Utc>>,
101 applied_routes: Vec<ProposedRoute>,
104}
105
106impl InMemoryPlanStore {
107 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn with_failing_route_write() -> Self {
116 Self {
117 fail_route_write: true,
118 ..Self::default()
119 }
120 }
121
122 pub fn seed_projected(&self) -> Uuid {
125 let id = Uuid::now_v7();
126 let mut g = self.rows.lock().expect("rows lock");
127 g.insert(
128 id,
129 InMemoryRow {
130 status: "projected".into(),
131 applied_at: None,
132 applied_routes: Vec::new(),
133 },
134 );
135 id
136 }
137
138 pub fn status(&self, plan_id: Uuid) -> Option<String> {
140 let g = self.rows.lock().expect("rows lock");
141 g.get(&plan_id).map(|r| r.status.clone())
142 }
143
144 pub fn applied_at(&self, plan_id: Uuid) -> Option<DateTime<Utc>> {
146 let g = self.rows.lock().expect("rows lock");
147 g.get(&plan_id).and_then(|r| r.applied_at)
148 }
149
150 pub fn applied_routes(&self, plan_id: Uuid) -> Option<Vec<ProposedRoute>> {
154 let g = self.rows.lock().expect("rows lock");
155 g.get(&plan_id).map(|r| r.applied_routes.clone())
156 }
157}
158
159#[async_trait]
160impl PlanStore for InMemoryPlanStore {
161 async fn mark_applied(
162 &self,
163 plan_id: Uuid,
164 applied_at: DateTime<Utc>,
165 routes: &[ProposedRoute],
166 ) -> Result<Option<String>, ApplyError> {
167 if self.fail_route_write {
172 return Err(ApplyError::Store("simulated route-write failure".into()));
173 }
174
175 let mut g = self
176 .rows
177 .lock()
178 .map_err(|e| ApplyError::Store(e.to_string()))?;
179 let Some(row) = g.get_mut(&plan_id) else {
180 return Ok(None);
181 };
182 let prev = row.status.clone();
183 if prev == "projected" {
188 row.status = "applied".into();
189 row.applied_at = Some(applied_at);
190 row.applied_routes = routes.to_vec();
191 }
192 Ok(Some(prev))
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
200struct ApplyPayload {
201 plan_id: Uuid,
202 applied_at: String,
203 sample_size: u32,
204 projected_savings_usd: f64,
205}
206
207pub async fn apply_plan<S: PlanStore, A: AuditWriter>(
233 store: &S,
234 audit_writer: &A,
235 result: &PlanResult,
236 actor: Actor,
237) -> Result<(), ApplyError> {
238 let now = Utc::now();
239 let prev_status = store
240 .mark_applied(result.plan_id, now, &result.proposed_routes)
241 .await?;
242 match prev_status {
243 None => return Err(ApplyError::NotFound),
244 Some(s) if s != "projected" => {
245 return Err(ApplyError::InvalidState { state: s });
246 }
247 _ => {}
248 }
249
250 let payload = ApplyPayload {
251 plan_id: result.plan_id,
252 applied_at: now.to_rfc3339(),
253 sample_size: result.sample_size,
254 projected_savings_usd: result.aggregates.projected_savings_usd,
255 };
256 let payload_value = serde_json::to_value(&payload)
257 .map_err(|e| ApplyError::Store(format!("serialize payload: {e}")))?;
258
259 audit_writer
260 .write(
261 result.org_id,
262 actor,
263 "plan.applied".to_string(),
264 payload_value,
265 )
266 .await?;
267
268 Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use crate::types::{Aggregates, ConfidenceIntervals, PlanResult, RouteAction, RouteConditions};
275 use chrono::Utc;
276 use tt_telemetry::audit::{verify_chain, InMemoryAuditWriter};
277
278 fn sample_routes() -> Vec<ProposedRoute> {
280 vec![ProposedRoute {
281 id: Uuid::now_v7(),
282 name: "haiku-for-cheap-classification".into(),
283 priority: 100,
284 enabled: true,
285 when: RouteConditions {
286 model_in: vec!["claude-sonnet-4-5".into()],
287 input_tokens_lt: Some(2_000),
288 input_tokens_gt: None,
289 tag_equals: None,
290 },
291 then: RouteAction {
292 target_model: "claude-haiku-4-5".into(),
293 force_cache_layer: None,
294 fallbacks: Vec::new(),
295 },
296 }]
297 }
298
299 fn make_plan_result(plan_id: Uuid, org_id: Uuid) -> PlanResult {
300 make_plan_result_with_routes(plan_id, org_id, sample_routes())
301 }
302
303 fn make_plan_result_with_routes(
304 plan_id: Uuid,
305 org_id: Uuid,
306 proposed_routes: Vec<ProposedRoute>,
307 ) -> PlanResult {
308 PlanResult {
309 plan_id,
310 org_id,
311 window_start: Utc::now(),
312 window_end: Utc::now(),
313 sample_size: 100,
314 aggregates: Aggregates {
315 total_baseline_cost_usd: 10.0,
316 total_projected_cost_usd: 6.0,
317 projected_savings_usd: 4.0,
318 projected_savings_pct: 40.0,
319 cache_hit_rate_projected: 0.0,
320 p50_latency_ms_projected: 100.0,
321 p95_latency_ms_projected: 250.0,
322 requests_rerouted: 50,
323 requests_unchanged: 50,
324 requests_unprice_able: 0,
325 l2_projections: Vec::new(),
326 l2_poisoning_candidates: 0,
327 },
328 confidence_intervals: ConfidenceIntervals {
329 savings_usd_95: (3.5, 4.5),
330 savings_pct_95: (35.0, 45.0),
331 cache_hit_rate_95: (0.0, 0.0),
332 p50_latency_ms_95: (90.0, 110.0),
333 p95_latency_ms_95: (200.0, 300.0),
334 },
335 per_route_breakdown: Vec::new(),
336 caveats: Vec::new(),
337 quality: None,
338 proposed_routes,
339 }
340 }
341
342 #[tokio::test]
343 async fn apply_marks_row_applied_and_emits_audit() {
344 let store = InMemoryPlanStore::new();
345 let audit = InMemoryAuditWriter::new();
346 let plan_id = store.seed_projected();
347 let org_id = Uuid::now_v7();
348 let result = make_plan_result(plan_id, org_id);
349
350 apply_plan(&store, &audit, &result, Actor::System)
351 .await
352 .expect("apply ok");
353
354 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
355 assert!(store.applied_at(plan_id).is_some());
356
357 let written = store.applied_routes(plan_id).expect("row exists");
359 assert_eq!(written.len(), 1, "the one proposed route must be written");
360 assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
361 assert_eq!(written[0].name, "haiku-for-cheap-classification");
362
363 let entries = audit.list(org_id).await.expect("list ok");
364 assert_eq!(entries.len(), 1);
365 assert_eq!(entries[0].event, "plan.applied");
366 assert!(entries[0]
367 .payload
368 .to_string()
369 .contains(&plan_id.to_string()));
370
371 let vk = audit.verifying_key();
373 verify_chain(&entries, &vk).expect("chain verifies");
374 }
375
376 #[tokio::test]
377 async fn apply_returns_not_found_for_unknown_plan() {
378 let store = InMemoryPlanStore::new();
379 let audit = InMemoryAuditWriter::new();
380 let result = make_plan_result(Uuid::now_v7(), Uuid::now_v7());
381
382 let err = apply_plan(&store, &audit, &result, Actor::System)
383 .await
384 .expect_err("unknown plan must fail");
385 assert!(matches!(err, ApplyError::NotFound));
386
387 let entries = audit.list(result.org_id).await.expect("list ok");
389 assert!(entries.is_empty());
390 }
391
392 #[tokio::test]
393 async fn apply_twice_returns_invalid_state() {
394 let store = InMemoryPlanStore::new();
395 let audit = InMemoryAuditWriter::new();
396 let plan_id = store.seed_projected();
397 let org_id = Uuid::now_v7();
398 let result = make_plan_result(plan_id, org_id);
399
400 apply_plan(&store, &audit, &result, Actor::System)
401 .await
402 .expect("first apply ok");
403 let err = apply_plan(&store, &audit, &result, Actor::System)
404 .await
405 .expect_err("re-apply must fail");
406 match err {
407 ApplyError::InvalidState { state } => assert_eq!(state, "applied"),
408 other => panic!("expected InvalidState, got {other:?}"),
409 }
410
411 let entries = audit.list(org_id).await.expect("list ok");
413 assert_eq!(entries.len(), 1);
414 }
415
416 #[tokio::test]
419 async fn apply_persists_proposed_routes_atomically_with_status() {
420 let store = InMemoryPlanStore::new();
421 let audit = InMemoryAuditWriter::new();
422 let plan_id = store.seed_projected();
423 let org_id = Uuid::now_v7();
424
425 let routes = sample_routes();
426 let route_id = routes[0].id;
427 let result = make_plan_result_with_routes(plan_id, org_id, routes);
428
429 apply_plan(&store, &audit, &result, Actor::System)
430 .await
431 .expect("apply ok");
432
433 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
435 let written = store.applied_routes(plan_id).expect("row exists");
437 assert_eq!(written.len(), 1);
438 assert_eq!(written[0].id, route_id);
439 assert_eq!(written[0].priority, 100);
440 assert_eq!(written[0].when.model_in, vec!["claude-sonnet-4-5"]);
441 assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
442
443 let entries = audit.list(org_id).await.expect("list ok");
445 assert_eq!(entries.len(), 1);
446 assert_eq!(entries[0].event, "plan.applied");
447 }
448
449 #[tokio::test]
453 async fn route_write_failure_emits_no_audit_row() {
454 let store = InMemoryPlanStore::with_failing_route_write();
456 let audit = InMemoryAuditWriter::new();
457 let plan_id = store.seed_projected();
459 let org_id = Uuid::now_v7();
460 let result = make_plan_result(plan_id, org_id);
461
462 let err = apply_plan(&store, &audit, &result, Actor::System)
463 .await
464 .expect_err("route-write failure must surface");
465 assert!(matches!(err, ApplyError::Store(_)), "got {err:?}");
466
467 assert_eq!(store.status(plan_id).as_deref(), Some("projected"));
469 assert!(store.applied_at(plan_id).is_none());
470 assert_eq!(
471 store.applied_routes(plan_id).expect("row exists").len(),
472 0,
473 "no routes may be written when the txn fails"
474 );
475
476 let entries = audit.list(org_id).await.expect("list ok");
478 assert!(
479 entries.is_empty(),
480 "audit row must NOT be emitted on a failed route write"
481 );
482 }
483
484 #[tokio::test]
488 async fn apply_with_empty_routes_flips_status_and_writes_none() {
489 let store = InMemoryPlanStore::new();
490 let audit = InMemoryAuditWriter::new();
491 let plan_id = store.seed_projected();
492 let org_id = Uuid::now_v7();
493 let result = make_plan_result_with_routes(plan_id, org_id, Vec::new());
494
495 apply_plan(&store, &audit, &result, Actor::System)
496 .await
497 .expect("apply ok even with no routes");
498
499 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
500 assert_eq!(store.applied_routes(plan_id).expect("row exists").len(), 0);
501 let entries = audit.list(org_id).await.expect("list ok");
502 assert_eq!(entries.len(), 1);
503 }
504}