1use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tt_telemetry::audit::{Actor, AuditError, AuditWriter};
19use uuid::Uuid;
20
21use crate::types::{PlanResult, ProposedRoute};
22
23#[derive(Debug, Error)]
25pub enum ApplyError {
26 #[error("plan not found")]
29 NotFound,
30 #[error("plan is in terminal state '{state}', cannot re-apply")]
33 InvalidState {
34 state: String,
36 },
37 #[error("store: {0}")]
41 Store(String),
42 #[error("audit: {0}")]
46 Audit(#[from] AuditError),
47}
48
49#[async_trait]
53pub trait PlanStore: Send + Sync {
54 async fn mark_applied(
78 &self,
79 plan_id: Uuid,
80 applied_at: DateTime<Utc>,
81 routes: &[ProposedRoute],
82 ) -> Result<Option<String>, ApplyError>;
83}
84
85#[derive(Default)]
87pub struct InMemoryPlanStore {
88 rows: Arc<Mutex<HashMap<Uuid, InMemoryRow>>>,
89 fail_route_write: bool,
95}
96
97#[derive(Debug, Clone)]
98struct InMemoryRow {
99 status: String,
100 applied_at: Option<DateTime<Utc>>,
101 applied_routes: Vec<ProposedRoute>,
104}
105
106impl InMemoryPlanStore {
107 pub fn new() -> Self {
109 Self::default()
110 }
111
112 pub fn with_failing_route_write() -> Self {
116 Self {
117 fail_route_write: true,
118 ..Self::default()
119 }
120 }
121
122 pub fn seed_projected(&self) -> Uuid {
125 let id = Uuid::now_v7();
126 let mut g = self.rows.lock().expect("rows lock");
127 g.insert(
128 id,
129 InMemoryRow {
130 status: "projected".into(),
131 applied_at: None,
132 applied_routes: Vec::new(),
133 },
134 );
135 id
136 }
137
138 pub fn status(&self, plan_id: Uuid) -> Option<String> {
140 let g = self.rows.lock().expect("rows lock");
141 g.get(&plan_id).map(|r| r.status.clone())
142 }
143
144 pub fn applied_at(&self, plan_id: Uuid) -> Option<DateTime<Utc>> {
146 let g = self.rows.lock().expect("rows lock");
147 g.get(&plan_id).and_then(|r| r.applied_at)
148 }
149
150 pub fn applied_routes(&self, plan_id: Uuid) -> Option<Vec<ProposedRoute>> {
154 let g = self.rows.lock().expect("rows lock");
155 g.get(&plan_id).map(|r| r.applied_routes.clone())
156 }
157}
158
159#[async_trait]
160impl PlanStore for InMemoryPlanStore {
161 async fn mark_applied(
162 &self,
163 plan_id: Uuid,
164 applied_at: DateTime<Utc>,
165 routes: &[ProposedRoute],
166 ) -> Result<Option<String>, ApplyError> {
167 if self.fail_route_write {
172 return Err(ApplyError::Store("simulated route-write failure".into()));
173 }
174
175 let mut g = self
176 .rows
177 .lock()
178 .map_err(|e| ApplyError::Store(e.to_string()))?;
179 let Some(row) = g.get_mut(&plan_id) else {
180 return Ok(None);
181 };
182 let prev = row.status.clone();
183 if prev == "projected" {
188 row.status = "applied".into();
189 row.applied_at = Some(applied_at);
190 row.applied_routes = routes.to_vec();
191 }
192 Ok(Some(prev))
193 }
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
200struct ApplyPayload {
201 plan_id: Uuid,
202 applied_at: String,
203 sample_size: u32,
204 projected_savings_usd: f64,
205}
206
207pub async fn apply_plan<S: PlanStore, A: AuditWriter>(
233 store: &S,
234 audit_writer: &A,
235 result: &PlanResult,
236 actor: Actor,
237) -> Result<(), ApplyError> {
238 let now = Utc::now();
239 let prev_status = store
240 .mark_applied(result.plan_id, now, &result.proposed_routes)
241 .await?;
242 match prev_status {
243 None => return Err(ApplyError::NotFound),
244 Some(s) if s != "projected" => {
245 return Err(ApplyError::InvalidState { state: s });
246 }
247 _ => {}
248 }
249
250 let payload = ApplyPayload {
251 plan_id: result.plan_id,
252 applied_at: now.to_rfc3339(),
253 sample_size: result.sample_size,
254 projected_savings_usd: result.aggregates.projected_savings_usd,
255 };
256 let payload_value = serde_json::to_value(&payload)
257 .map_err(|e| ApplyError::Store(format!("serialize payload: {e}")))?;
258
259 audit_writer
260 .write(
261 result.org_id,
262 actor,
263 "plan.applied".to_string(),
264 payload_value,
265 )
266 .await?;
267
268 Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use crate::types::{Aggregates, ConfidenceIntervals, PlanResult, RouteAction, RouteConditions};
275 use chrono::Utc;
276 use tt_telemetry::audit::{verify_chain, InMemoryAuditWriter};
277
278 fn sample_routes() -> Vec<ProposedRoute> {
280 vec![ProposedRoute {
281 id: Uuid::now_v7(),
282 name: "haiku-for-cheap-classification".into(),
283 priority: 100,
284 enabled: true,
285 when: RouteConditions {
286 model_in: vec!["claude-sonnet-4-5".into()],
287 input_tokens_lt: Some(2_000),
288 input_tokens_gt: None,
289 tag_equals: None,
290 has_images: None,
291 has_audio: None,
292 prompt_contains_any_of: vec![],
293 estimated_cost_gt: None,
294 estimated_cost_lt: None,
295 },
296 then: RouteAction {
297 target_model: "claude-haiku-4-5".into(),
298 fallbacks: Vec::new(),
299 disable_cache: false,
300 max_cost_usd: None,
301 },
302 }]
303 }
304
305 fn make_plan_result(plan_id: Uuid, org_id: Uuid) -> PlanResult {
306 make_plan_result_with_routes(plan_id, org_id, sample_routes())
307 }
308
309 fn make_plan_result_with_routes(
310 plan_id: Uuid,
311 org_id: Uuid,
312 proposed_routes: Vec<ProposedRoute>,
313 ) -> PlanResult {
314 PlanResult {
315 plan_id,
316 org_id,
317 window_start: Utc::now(),
318 window_end: Utc::now(),
319 sample_size: 100,
320 aggregates: Aggregates {
321 total_baseline_cost_usd: 10.0,
322 total_projected_cost_usd: 6.0,
323 projected_savings_usd: 4.0,
324 projected_savings_pct: 40.0,
325 cache_hit_rate_projected: 0.0,
326 p50_latency_ms_projected: 100.0,
327 p95_latency_ms_projected: 250.0,
328 requests_rerouted: 50,
329 requests_unchanged: 50,
330 requests_unprice_able: 0,
331 l2_projections: Vec::new(),
332 l2_poisoning_candidates: 0,
333 },
334 confidence_intervals: ConfidenceIntervals {
335 savings_usd_95: (3.5, 4.5),
336 savings_pct_95: (35.0, 45.0),
337 cache_hit_rate_95: (0.0, 0.0),
338 p50_latency_ms_95: (90.0, 110.0),
339 p95_latency_ms_95: (200.0, 300.0),
340 },
341 per_route_breakdown: Vec::new(),
342 caveats: Vec::new(),
343 quality: None,
344 proposed_routes,
345 }
346 }
347
348 #[tokio::test]
349 async fn apply_marks_row_applied_and_emits_audit() {
350 let store = InMemoryPlanStore::new();
351 let audit = InMemoryAuditWriter::new();
352 let plan_id = store.seed_projected();
353 let org_id = Uuid::now_v7();
354 let result = make_plan_result(plan_id, org_id);
355
356 apply_plan(&store, &audit, &result, Actor::System)
357 .await
358 .expect("apply ok");
359
360 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
361 assert!(store.applied_at(plan_id).is_some());
362
363 let written = store.applied_routes(plan_id).expect("row exists");
365 assert_eq!(written.len(), 1, "the one proposed route must be written");
366 assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
367 assert_eq!(written[0].name, "haiku-for-cheap-classification");
368
369 let entries = audit.list(org_id).await.expect("list ok");
370 assert_eq!(entries.len(), 1);
371 assert_eq!(entries[0].event, "plan.applied");
372 assert!(entries[0]
373 .payload
374 .to_string()
375 .contains(&plan_id.to_string()));
376
377 let vk = audit.verifying_key();
379 verify_chain(&entries, &vk).expect("chain verifies");
380 }
381
382 #[tokio::test]
383 async fn apply_returns_not_found_for_unknown_plan() {
384 let store = InMemoryPlanStore::new();
385 let audit = InMemoryAuditWriter::new();
386 let result = make_plan_result(Uuid::now_v7(), Uuid::now_v7());
387
388 let err = apply_plan(&store, &audit, &result, Actor::System)
389 .await
390 .expect_err("unknown plan must fail");
391 assert!(matches!(err, ApplyError::NotFound));
392
393 let entries = audit.list(result.org_id).await.expect("list ok");
395 assert!(entries.is_empty());
396 }
397
398 #[tokio::test]
399 async fn apply_twice_returns_invalid_state() {
400 let store = InMemoryPlanStore::new();
401 let audit = InMemoryAuditWriter::new();
402 let plan_id = store.seed_projected();
403 let org_id = Uuid::now_v7();
404 let result = make_plan_result(plan_id, org_id);
405
406 apply_plan(&store, &audit, &result, Actor::System)
407 .await
408 .expect("first apply ok");
409 let err = apply_plan(&store, &audit, &result, Actor::System)
410 .await
411 .expect_err("re-apply must fail");
412 match err {
413 ApplyError::InvalidState { state } => assert_eq!(state, "applied"),
414 other => panic!("expected InvalidState, got {other:?}"),
415 }
416
417 let entries = audit.list(org_id).await.expect("list ok");
419 assert_eq!(entries.len(), 1);
420 }
421
422 #[tokio::test]
425 async fn apply_persists_proposed_routes_atomically_with_status() {
426 let store = InMemoryPlanStore::new();
427 let audit = InMemoryAuditWriter::new();
428 let plan_id = store.seed_projected();
429 let org_id = Uuid::now_v7();
430
431 let routes = sample_routes();
432 let route_id = routes[0].id;
433 let result = make_plan_result_with_routes(plan_id, org_id, routes);
434
435 apply_plan(&store, &audit, &result, Actor::System)
436 .await
437 .expect("apply ok");
438
439 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
441 let written = store.applied_routes(plan_id).expect("row exists");
443 assert_eq!(written.len(), 1);
444 assert_eq!(written[0].id, route_id);
445 assert_eq!(written[0].priority, 100);
446 assert_eq!(written[0].when.model_in, vec!["claude-sonnet-4-5"]);
447 assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
448
449 let entries = audit.list(org_id).await.expect("list ok");
451 assert_eq!(entries.len(), 1);
452 assert_eq!(entries[0].event, "plan.applied");
453 }
454
455 #[tokio::test]
459 async fn route_write_failure_emits_no_audit_row() {
460 let store = InMemoryPlanStore::with_failing_route_write();
462 let audit = InMemoryAuditWriter::new();
463 let plan_id = store.seed_projected();
465 let org_id = Uuid::now_v7();
466 let result = make_plan_result(plan_id, org_id);
467
468 let err = apply_plan(&store, &audit, &result, Actor::System)
469 .await
470 .expect_err("route-write failure must surface");
471 assert!(matches!(err, ApplyError::Store(_)), "got {err:?}");
472
473 assert_eq!(store.status(plan_id).as_deref(), Some("projected"));
475 assert!(store.applied_at(plan_id).is_none());
476 assert_eq!(
477 store.applied_routes(plan_id).expect("row exists").len(),
478 0,
479 "no routes may be written when the txn fails"
480 );
481
482 let entries = audit.list(org_id).await.expect("list ok");
484 assert!(
485 entries.is_empty(),
486 "audit row must NOT be emitted on a failed route write"
487 );
488 }
489
490 #[tokio::test]
494 async fn apply_with_empty_routes_flips_status_and_writes_none() {
495 let store = InMemoryPlanStore::new();
496 let audit = InMemoryAuditWriter::new();
497 let plan_id = store.seed_projected();
498 let org_id = Uuid::now_v7();
499 let result = make_plan_result_with_routes(plan_id, org_id, Vec::new());
500
501 apply_plan(&store, &audit, &result, Actor::System)
502 .await
503 .expect("apply ok even with no routes");
504
505 assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
506 assert_eq!(store.applied_routes(plan_id).expect("row exists").len(), 0);
507 let entries = audit.list(org_id).await.expect("list ok");
508 assert_eq!(entries.len(), 1);
509 }
510}