Skip to main content

tt_plan_core/
apply.rs

1//! Plan apply path — mark a Plan as applied + emit a `plan.applied` audit row.
2//!
3//! Mirrors the [`tt_auth::revoke_key`] pattern: a small free function that
4//! couples a store mutation with an audit emission so callers can't acquire
5//! "apply" semantics without leaving a tamper-evident chain entry.
6//!
7//! The store is fronted by a trait so this library doesn't drag in sqlx —
8//! the hosted cloud worker provides a `PostgresPlanStore`; tests use
9//! [`InMemoryPlanStore`] from this module.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tt_telemetry::audit::{Actor, AuditError, AuditWriter};
19use uuid::Uuid;
20
21use crate::types::{PlanResult, ProposedRoute};
22
23/// Errors returned by the plan-apply path.
24#[derive(Debug, Error)]
25pub enum ApplyError {
26    /// The plan row was not found in the store. Either the id is wrong or
27    /// the plan has been purged.
28    #[error("plan not found")]
29    NotFound,
30    /// The plan was already in a terminal state (applied / reverted /
31    /// failed) — apply is idempotent only from `projected`.
32    #[error("plan is in terminal state '{state}', cannot re-apply")]
33    InvalidState {
34        /// The status the store reported.
35        state: String,
36    },
37    /// Store-side failure. The mutation may or may not have committed —
38    /// callers should treat as "needs investigation" rather than retrying
39    /// blindly.
40    #[error("store: {0}")]
41    Store(String),
42    /// Audit row failed to write AFTER the store mutation committed. The
43    /// plan IS applied; the chain entry was not. Caller should re-attempt
44    /// the audit emission out-of-band rather than re-applying.
45    #[error("audit: {0}")]
46    Audit(#[from] AuditError),
47}
48
49/// Persistence contract for plan_runs rows. Implementations: [`InMemoryPlanStore`]
50/// (this file, for tests), `PostgresPlanStore` (lands in the cloud worker
51/// crate when the sqlx-pool wiring is done).
52#[async_trait]
53pub trait PlanStore: Send + Sync {
54    /// Atomically (1) transition `plan_id` from status `'projected'` to
55    /// `'applied'` and stamp `applied_at`, AND (2) persist `routes` to the
56    /// Gateway routing config the live gateway reads. Return the previous
57    /// status when the row exists, or `None` when no such row.
58    ///
59    /// MUST be atomic across BOTH effects: the status flip and the route
60    /// write commit together or not at all. A partial update — status
61    /// flipped but routes not written, applied_at stamped but status
62    /// mismatched — violates the audit promise, because the `plan.applied`
63    /// chain entry asserts that the routes are now live. Implementations
64    /// MUST use a single transaction (Postgres: `BEGIN; UPDATE plan_runs;
65    /// INSERT INTO routes; COMMIT`).
66    ///
67    /// The store MUST only write `routes` when the transition actually
68    /// happens (previous status was `'projected'`). When the row is already
69    /// in a terminal state, return the previous status and write nothing —
70    /// the free function [`apply_plan`] turns that into
71    /// [`ApplyError::InvalidState`] and emits no audit row.
72    ///
73    /// `routes` is the caller's authored route set (see
74    /// [`PlanResult::proposed_routes`]); it may be empty, in which case the
75    /// status flips and no routes are written (the legacy no-op shape, for
76    /// plan rows persisted before routes were carried on the result).
77    async fn mark_applied(
78        &self,
79        plan_id: Uuid,
80        applied_at: DateTime<Utc>,
81        routes: &[ProposedRoute],
82    ) -> Result<Option<String>, ApplyError>;
83}
84
85/// In-memory store for tests. Tracks status + applied routes per `plan_id`.
86#[derive(Default)]
87pub struct InMemoryPlanStore {
88    rows: Arc<Mutex<HashMap<Uuid, InMemoryRow>>>,
89    /// When set, [`PlanStore::mark_applied`] returns
90    /// [`ApplyError::Store`] *before* mutating anything — used by tests to
91    /// prove that a route-write failure leaves the row untouched AND emits
92    /// no audit row. Mirrors a Postgres `INSERT INTO routes` failing inside
93    /// the transaction (which rolls the whole txn back).
94    fail_route_write: bool,
95}
96
97#[derive(Debug, Clone)]
98struct InMemoryRow {
99    status: String,
100    applied_at: Option<DateTime<Utc>>,
101    /// Routes written when the row transitioned to `applied`. Empty until
102    /// (and unless) a successful apply with a non-empty route set.
103    applied_routes: Vec<ProposedRoute>,
104}
105
106impl InMemoryPlanStore {
107    /// Construct an empty store.
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Construct a store whose [`PlanStore::mark_applied`] always fails the
113    /// route write (and therefore the whole atomic operation). For tests
114    /// that assert no audit row is emitted when persistence fails.
115    pub fn with_failing_route_write() -> Self {
116        Self {
117            fail_route_write: true,
118            ..Self::default()
119        }
120    }
121
122    /// Seed a row in `status='projected'`. Returns the id for callers to
123    /// pass to [`apply_plan`].
124    pub fn seed_projected(&self) -> Uuid {
125        let id = Uuid::now_v7();
126        let mut g = self.rows.lock().expect("rows lock");
127        g.insert(
128            id,
129            InMemoryRow {
130                status: "projected".into(),
131                applied_at: None,
132                applied_routes: Vec::new(),
133            },
134        );
135        id
136    }
137
138    /// Read-only status lookup, used by tests to assert state transitions.
139    pub fn status(&self, plan_id: Uuid) -> Option<String> {
140        let g = self.rows.lock().expect("rows lock");
141        g.get(&plan_id).map(|r| r.status.clone())
142    }
143
144    /// Read-only applied-at lookup.
145    pub fn applied_at(&self, plan_id: Uuid) -> Option<DateTime<Utc>> {
146        let g = self.rows.lock().expect("rows lock");
147        g.get(&plan_id).and_then(|r| r.applied_at)
148    }
149
150    /// Read-only lookup of the routes persisted by a successful apply. Used
151    /// by tests to assert the proposed routes actually landed in the store
152    /// (the heart of the rv-plan-apply-writes-routes fix).
153    pub fn applied_routes(&self, plan_id: Uuid) -> Option<Vec<ProposedRoute>> {
154        let g = self.rows.lock().expect("rows lock");
155        g.get(&plan_id).map(|r| r.applied_routes.clone())
156    }
157}
158
159#[async_trait]
160impl PlanStore for InMemoryPlanStore {
161    async fn mark_applied(
162        &self,
163        plan_id: Uuid,
164        applied_at: DateTime<Utc>,
165        routes: &[ProposedRoute],
166    ) -> Result<Option<String>, ApplyError> {
167        // Simulate the route-write leg of the transaction failing. We fail
168        // BEFORE touching any row state so the post-condition matches a real
169        // txn rollback: nothing committed, so no audit row is emitted by the
170        // caller.
171        if self.fail_route_write {
172            return Err(ApplyError::Store("simulated route-write failure".into()));
173        }
174
175        let mut g = self
176            .rows
177            .lock()
178            .map_err(|e| ApplyError::Store(e.to_string()))?;
179        let Some(row) = g.get_mut(&plan_id) else {
180            return Ok(None);
181        };
182        let prev = row.status.clone();
183        // Both effects happen together, and only on the projected→applied
184        // transition. Holding the single `rows` mutex for the whole block is
185        // this store's stand-in for the Postgres transaction the contract
186        // requires.
187        if prev == "projected" {
188            row.status = "applied".into();
189            row.applied_at = Some(applied_at);
190            row.applied_routes = routes.to_vec();
191        }
192        Ok(Some(prev))
193    }
194}
195
196/// Apply audit payload — just the public-safe fields. NEVER includes the
197/// full proposed config diff (which can contain customer-specific routing
198/// patterns); that's already on the plan_runs row for join-time retrieval.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200struct ApplyPayload {
201    plan_id: Uuid,
202    applied_at: String,
203    sample_size: u32,
204    projected_savings_usd: f64,
205}
206
207/// Mark a Plan as applied, persist its proposed routes, and emit a
208/// `plan.applied` audit row.
209///
210/// Two-step:
211///   1. `store.mark_applied(plan_id, now, routes)` — atomic state transition
212///      AND route persistence (both commit together; see [`PlanStore`]).
213///   2. `audit_writer.write(plan.applied, payload)` — tamper-evident record,
214///      emitted only AFTER step 1 succeeds, so the chain never asserts an
215///      apply that didn't actually write its routes.
216///
217/// The routes come from [`PlanResult::proposed_routes`], which `replay()`
218/// populates from the [`crate::types::PlanInput`]. Callers persisting the
219/// result must keep that field populated; a result deserialized from a
220/// legacy row (no routes) applies the status flip with an empty route set.
221///
222/// # Errors
223///
224/// - [`ApplyError::NotFound`] — no row matches `result.plan_id`.
225/// - [`ApplyError::InvalidState`] — row exists but is not in `projected`
226///   (already applied, reverted, or failed). No routes were written.
227/// - [`ApplyError::Store`] — the atomic status-flip + route-write could not
228///   complete; by contract nothing was committed, so no audit row is emitted.
229/// - [`ApplyError::Audit`] — store succeeded but audit emission failed.
230///   The plan IS applied (and routes ARE written) when this is returned;
231///   out-of-band recovery is the caller's responsibility.
232pub async fn apply_plan<S: PlanStore, A: AuditWriter>(
233    store: &S,
234    audit_writer: &A,
235    result: &PlanResult,
236    actor: Actor,
237) -> Result<(), ApplyError> {
238    let now = Utc::now();
239    let prev_status = store
240        .mark_applied(result.plan_id, now, &result.proposed_routes)
241        .await?;
242    match prev_status {
243        None => return Err(ApplyError::NotFound),
244        Some(s) if s != "projected" => {
245            return Err(ApplyError::InvalidState { state: s });
246        }
247        _ => {}
248    }
249
250    let payload = ApplyPayload {
251        plan_id: result.plan_id,
252        applied_at: now.to_rfc3339(),
253        sample_size: result.sample_size,
254        projected_savings_usd: result.aggregates.projected_savings_usd,
255    };
256    let payload_value = serde_json::to_value(&payload)
257        .map_err(|e| ApplyError::Store(format!("serialize payload: {e}")))?;
258
259    audit_writer
260        .write(
261            result.org_id,
262            actor,
263            "plan.applied".to_string(),
264            payload_value,
265        )
266        .await?;
267
268    Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use crate::types::{Aggregates, ConfidenceIntervals, PlanResult, RouteAction, RouteConditions};
275    use chrono::Utc;
276    use tt_telemetry::audit::{verify_chain, InMemoryAuditWriter};
277
278    /// One non-trivial proposed route, so apply has something real to persist.
279    fn sample_routes() -> Vec<ProposedRoute> {
280        vec![ProposedRoute {
281            id: Uuid::now_v7(),
282            name: "haiku-for-cheap-classification".into(),
283            priority: 100,
284            enabled: true,
285            when: RouteConditions {
286                model_in: vec!["claude-sonnet-4-5".into()],
287                input_tokens_lt: Some(2_000),
288                input_tokens_gt: None,
289                tag_equals: None,
290                has_images: None,
291                has_audio: None,
292                prompt_contains_any_of: vec![],
293                estimated_cost_gt: None,
294                estimated_cost_lt: None,
295            },
296            then: RouteAction {
297                target_model: "claude-haiku-4-5".into(),
298                fallbacks: Vec::new(),
299                disable_cache: false,
300                max_cost_usd: None,
301            },
302        }]
303    }
304
305    fn make_plan_result(plan_id: Uuid, org_id: Uuid) -> PlanResult {
306        make_plan_result_with_routes(plan_id, org_id, sample_routes())
307    }
308
309    fn make_plan_result_with_routes(
310        plan_id: Uuid,
311        org_id: Uuid,
312        proposed_routes: Vec<ProposedRoute>,
313    ) -> PlanResult {
314        PlanResult {
315            plan_id,
316            org_id,
317            window_start: Utc::now(),
318            window_end: Utc::now(),
319            sample_size: 100,
320            aggregates: Aggregates {
321                total_baseline_cost_usd: 10.0,
322                total_projected_cost_usd: 6.0,
323                projected_savings_usd: 4.0,
324                projected_savings_pct: 40.0,
325                cache_hit_rate_projected: 0.0,
326                p50_latency_ms_projected: 100.0,
327                p95_latency_ms_projected: 250.0,
328                requests_rerouted: 50,
329                requests_unchanged: 50,
330                requests_unprice_able: 0,
331                l2_projections: Vec::new(),
332                l2_poisoning_candidates: 0,
333            },
334            confidence_intervals: ConfidenceIntervals {
335                savings_usd_95: (3.5, 4.5),
336                savings_pct_95: (35.0, 45.0),
337                cache_hit_rate_95: (0.0, 0.0),
338                p50_latency_ms_95: (90.0, 110.0),
339                p95_latency_ms_95: (200.0, 300.0),
340            },
341            per_route_breakdown: Vec::new(),
342            caveats: Vec::new(),
343            quality: None,
344            proposed_routes,
345        }
346    }
347
348    #[tokio::test]
349    async fn apply_marks_row_applied_and_emits_audit() {
350        let store = InMemoryPlanStore::new();
351        let audit = InMemoryAuditWriter::new();
352        let plan_id = store.seed_projected();
353        let org_id = Uuid::now_v7();
354        let result = make_plan_result(plan_id, org_id);
355
356        apply_plan(&store, &audit, &result, Actor::System)
357            .await
358            .expect("apply ok");
359
360        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
361        assert!(store.applied_at(plan_id).is_some());
362
363        // (a) The proposed routes were persisted alongside the status flip.
364        let written = store.applied_routes(plan_id).expect("row exists");
365        assert_eq!(written.len(), 1, "the one proposed route must be written");
366        assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
367        assert_eq!(written[0].name, "haiku-for-cheap-classification");
368
369        let entries = audit.list(org_id).await.expect("list ok");
370        assert_eq!(entries.len(), 1);
371        assert_eq!(entries[0].event, "plan.applied");
372        assert!(entries[0]
373            .payload
374            .to_string()
375            .contains(&plan_id.to_string()));
376
377        // Chain integrity.
378        let vk = audit.verifying_key();
379        verify_chain(&entries, &vk).expect("chain verifies");
380    }
381
382    #[tokio::test]
383    async fn apply_returns_not_found_for_unknown_plan() {
384        let store = InMemoryPlanStore::new();
385        let audit = InMemoryAuditWriter::new();
386        let result = make_plan_result(Uuid::now_v7(), Uuid::now_v7());
387
388        let err = apply_plan(&store, &audit, &result, Actor::System)
389            .await
390            .expect_err("unknown plan must fail");
391        assert!(matches!(err, ApplyError::NotFound));
392
393        // No audit row for a failed apply.
394        let entries = audit.list(result.org_id).await.expect("list ok");
395        assert!(entries.is_empty());
396    }
397
398    #[tokio::test]
399    async fn apply_twice_returns_invalid_state() {
400        let store = InMemoryPlanStore::new();
401        let audit = InMemoryAuditWriter::new();
402        let plan_id = store.seed_projected();
403        let org_id = Uuid::now_v7();
404        let result = make_plan_result(plan_id, org_id);
405
406        apply_plan(&store, &audit, &result, Actor::System)
407            .await
408            .expect("first apply ok");
409        let err = apply_plan(&store, &audit, &result, Actor::System)
410            .await
411            .expect_err("re-apply must fail");
412        match err {
413            ApplyError::InvalidState { state } => assert_eq!(state, "applied"),
414            other => panic!("expected InvalidState, got {other:?}"),
415        }
416
417        // Only one audit row — second attempt didn't emit.
418        let entries = audit.list(org_id).await.expect("list ok");
419        assert_eq!(entries.len(), 1);
420    }
421
422    /// (a) Applying a projected plan records BOTH `status='applied'` AND the
423    /// exact proposed routes — the core rv-plan-apply-writes-routes contract.
424    #[tokio::test]
425    async fn apply_persists_proposed_routes_atomically_with_status() {
426        let store = InMemoryPlanStore::new();
427        let audit = InMemoryAuditWriter::new();
428        let plan_id = store.seed_projected();
429        let org_id = Uuid::now_v7();
430
431        let routes = sample_routes();
432        let route_id = routes[0].id;
433        let result = make_plan_result_with_routes(plan_id, org_id, routes);
434
435        apply_plan(&store, &audit, &result, Actor::System)
436            .await
437            .expect("apply ok");
438
439        // Status flipped.
440        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
441        // Routes written, identity preserved.
442        let written = store.applied_routes(plan_id).expect("row exists");
443        assert_eq!(written.len(), 1);
444        assert_eq!(written[0].id, route_id);
445        assert_eq!(written[0].priority, 100);
446        assert_eq!(written[0].when.model_in, vec!["claude-sonnet-4-5"]);
447        assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
448
449        // And the audit row was emitted (after both effects).
450        let entries = audit.list(org_id).await.expect("list ok");
451        assert_eq!(entries.len(), 1);
452        assert_eq!(entries[0].event, "plan.applied");
453    }
454
455    /// (c) When the store fails the route write, the operation is rejected
456    /// AND no audit row is emitted — the chain must never assert an apply
457    /// whose routes didn't land.
458    #[tokio::test]
459    async fn route_write_failure_emits_no_audit_row() {
460        // This store has a row in `projected` but fails the atomic write.
461        let store = InMemoryPlanStore::with_failing_route_write();
462        let audit = InMemoryAuditWriter::new();
463        // Seed a row so the failure is the route write, not a missing row.
464        let plan_id = store.seed_projected();
465        let org_id = Uuid::now_v7();
466        let result = make_plan_result(plan_id, org_id);
467
468        let err = apply_plan(&store, &audit, &result, Actor::System)
469            .await
470            .expect_err("route-write failure must surface");
471        assert!(matches!(err, ApplyError::Store(_)), "got {err:?}");
472
473        // Row untouched: still projected, no routes, no applied_at.
474        assert_eq!(store.status(plan_id).as_deref(), Some("projected"));
475        assert!(store.applied_at(plan_id).is_none());
476        assert_eq!(
477            store.applied_routes(plan_id).expect("row exists").len(),
478            0,
479            "no routes may be written when the txn fails"
480        );
481
482        // No audit row.
483        let entries = audit.list(org_id).await.expect("list ok");
484        assert!(
485            entries.is_empty(),
486            "audit row must NOT be emitted on a failed route write"
487        );
488    }
489
490    /// A legacy result (no carried routes) still applies — flips status and
491    /// writes zero routes, matching the pre-fix no-op shape rather than
492    /// erroring. Guards the `#[serde(default)]` round-trip path.
493    #[tokio::test]
494    async fn apply_with_empty_routes_flips_status_and_writes_none() {
495        let store = InMemoryPlanStore::new();
496        let audit = InMemoryAuditWriter::new();
497        let plan_id = store.seed_projected();
498        let org_id = Uuid::now_v7();
499        let result = make_plan_result_with_routes(plan_id, org_id, Vec::new());
500
501        apply_plan(&store, &audit, &result, Actor::System)
502            .await
503            .expect("apply ok even with no routes");
504
505        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
506        assert_eq!(store.applied_routes(plan_id).expect("row exists").len(), 0);
507        let entries = audit.list(org_id).await.expect("list ok");
508        assert_eq!(entries.len(), 1);
509    }
510}