Skip to main content

tt_plan_core/
apply.rs

1//! Plan apply path — mark a Plan as applied + emit a `plan.applied` audit row.
2//!
3//! Mirrors the [`tt_auth::revoke_key`] pattern: a small free function that
4//! couples a store mutation with an audit emission so callers can't acquire
5//! "apply" semantics without leaving a tamper-evident chain entry.
6//!
7//! The store is fronted by a trait so this library doesn't drag in sqlx —
8//! the hosted cloud worker provides a `PostgresPlanStore`; tests use
9//! [`InMemoryPlanStore`] from this module.
10
11use std::collections::HashMap;
12use std::sync::{Arc, Mutex};
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use thiserror::Error;
18use tt_telemetry::audit::{Actor, AuditError, AuditWriter};
19use uuid::Uuid;
20
21use crate::types::{PlanResult, ProposedRoute};
22
23/// Errors returned by the plan-apply path.
24#[derive(Debug, Error)]
25pub enum ApplyError {
26    /// The plan row was not found in the store. Either the id is wrong or
27    /// the plan has been purged.
28    #[error("plan not found")]
29    NotFound,
30    /// The plan was already in a terminal state (applied / reverted /
31    /// failed) — apply is idempotent only from `projected`.
32    #[error("plan is in terminal state '{state}', cannot re-apply")]
33    InvalidState {
34        /// The status the store reported.
35        state: String,
36    },
37    /// Store-side failure. The mutation may or may not have committed —
38    /// callers should treat as "needs investigation" rather than retrying
39    /// blindly.
40    #[error("store: {0}")]
41    Store(String),
42    /// Audit row failed to write AFTER the store mutation committed. The
43    /// plan IS applied; the chain entry was not. Caller should re-attempt
44    /// the audit emission out-of-band rather than re-applying.
45    #[error("audit: {0}")]
46    Audit(#[from] AuditError),
47}
48
49/// Persistence contract for plan_runs rows. Implementations: [`InMemoryPlanStore`]
50/// (this file, for tests), `PostgresPlanStore` (lands in the cloud worker
51/// crate when the sqlx-pool wiring is done).
52#[async_trait]
53pub trait PlanStore: Send + Sync {
54    /// Atomically (1) transition `plan_id` from status `'projected'` to
55    /// `'applied'` and stamp `applied_at`, AND (2) persist `routes` to the
56    /// Gateway routing config the live gateway reads. Return the previous
57    /// status when the row exists, or `None` when no such row.
58    ///
59    /// MUST be atomic across BOTH effects: the status flip and the route
60    /// write commit together or not at all. A partial update — status
61    /// flipped but routes not written, applied_at stamped but status
62    /// mismatched — violates the audit promise, because the `plan.applied`
63    /// chain entry asserts that the routes are now live. Implementations
64    /// MUST use a single transaction (Postgres: `BEGIN; UPDATE plan_runs;
65    /// INSERT INTO routes; COMMIT`).
66    ///
67    /// The store MUST only write `routes` when the transition actually
68    /// happens (previous status was `'projected'`). When the row is already
69    /// in a terminal state, return the previous status and write nothing —
70    /// the free function [`apply_plan`] turns that into
71    /// [`ApplyError::InvalidState`] and emits no audit row.
72    ///
73    /// `routes` is the caller's authored route set (see
74    /// [`PlanResult::proposed_routes`]); it may be empty, in which case the
75    /// status flips and no routes are written (the legacy no-op shape, for
76    /// plan rows persisted before routes were carried on the result).
77    async fn mark_applied(
78        &self,
79        plan_id: Uuid,
80        applied_at: DateTime<Utc>,
81        routes: &[ProposedRoute],
82    ) -> Result<Option<String>, ApplyError>;
83}
84
85/// In-memory store for tests. Tracks status + applied routes per `plan_id`.
86#[derive(Default)]
87pub struct InMemoryPlanStore {
88    rows: Arc<Mutex<HashMap<Uuid, InMemoryRow>>>,
89    /// When set, [`PlanStore::mark_applied`] returns
90    /// [`ApplyError::Store`] *before* mutating anything — used by tests to
91    /// prove that a route-write failure leaves the row untouched AND emits
92    /// no audit row. Mirrors a Postgres `INSERT INTO routes` failing inside
93    /// the transaction (which rolls the whole txn back).
94    fail_route_write: bool,
95}
96
97#[derive(Debug, Clone)]
98struct InMemoryRow {
99    status: String,
100    applied_at: Option<DateTime<Utc>>,
101    /// Routes written when the row transitioned to `applied`. Empty until
102    /// (and unless) a successful apply with a non-empty route set.
103    applied_routes: Vec<ProposedRoute>,
104}
105
106impl InMemoryPlanStore {
107    /// Construct an empty store.
108    pub fn new() -> Self {
109        Self::default()
110    }
111
112    /// Construct a store whose [`PlanStore::mark_applied`] always fails the
113    /// route write (and therefore the whole atomic operation). For tests
114    /// that assert no audit row is emitted when persistence fails.
115    pub fn with_failing_route_write() -> Self {
116        Self {
117            fail_route_write: true,
118            ..Self::default()
119        }
120    }
121
122    /// Seed a row in `status='projected'`. Returns the id for callers to
123    /// pass to [`apply_plan`].
124    pub fn seed_projected(&self) -> Uuid {
125        let id = Uuid::now_v7();
126        let mut g = self.rows.lock().expect("rows lock");
127        g.insert(
128            id,
129            InMemoryRow {
130                status: "projected".into(),
131                applied_at: None,
132                applied_routes: Vec::new(),
133            },
134        );
135        id
136    }
137
138    /// Read-only status lookup, used by tests to assert state transitions.
139    pub fn status(&self, plan_id: Uuid) -> Option<String> {
140        let g = self.rows.lock().expect("rows lock");
141        g.get(&plan_id).map(|r| r.status.clone())
142    }
143
144    /// Read-only applied-at lookup.
145    pub fn applied_at(&self, plan_id: Uuid) -> Option<DateTime<Utc>> {
146        let g = self.rows.lock().expect("rows lock");
147        g.get(&plan_id).and_then(|r| r.applied_at)
148    }
149
150    /// Read-only lookup of the routes persisted by a successful apply. Used
151    /// by tests to assert the proposed routes actually landed in the store
152    /// (the heart of the rv-plan-apply-writes-routes fix).
153    pub fn applied_routes(&self, plan_id: Uuid) -> Option<Vec<ProposedRoute>> {
154        let g = self.rows.lock().expect("rows lock");
155        g.get(&plan_id).map(|r| r.applied_routes.clone())
156    }
157}
158
159#[async_trait]
160impl PlanStore for InMemoryPlanStore {
161    async fn mark_applied(
162        &self,
163        plan_id: Uuid,
164        applied_at: DateTime<Utc>,
165        routes: &[ProposedRoute],
166    ) -> Result<Option<String>, ApplyError> {
167        // Simulate the route-write leg of the transaction failing. We fail
168        // BEFORE touching any row state so the post-condition matches a real
169        // txn rollback: nothing committed, so no audit row is emitted by the
170        // caller.
171        if self.fail_route_write {
172            return Err(ApplyError::Store("simulated route-write failure".into()));
173        }
174
175        let mut g = self
176            .rows
177            .lock()
178            .map_err(|e| ApplyError::Store(e.to_string()))?;
179        let Some(row) = g.get_mut(&plan_id) else {
180            return Ok(None);
181        };
182        let prev = row.status.clone();
183        // Both effects happen together, and only on the projected→applied
184        // transition. Holding the single `rows` mutex for the whole block is
185        // this store's stand-in for the Postgres transaction the contract
186        // requires.
187        if prev == "projected" {
188            row.status = "applied".into();
189            row.applied_at = Some(applied_at);
190            row.applied_routes = routes.to_vec();
191        }
192        Ok(Some(prev))
193    }
194}
195
196/// Apply audit payload — just the public-safe fields. NEVER includes the
197/// full proposed config diff (which can contain customer-specific routing
198/// patterns); that's already on the plan_runs row for join-time retrieval.
199#[derive(Debug, Clone, Serialize, Deserialize)]
200struct ApplyPayload {
201    plan_id: Uuid,
202    applied_at: String,
203    sample_size: u32,
204    projected_savings_usd: f64,
205}
206
207/// Mark a Plan as applied, persist its proposed routes, and emit a
208/// `plan.applied` audit row.
209///
210/// Two-step:
211///   1. `store.mark_applied(plan_id, now, routes)` — atomic state transition
212///      AND route persistence (both commit together; see [`PlanStore`]).
213///   2. `audit_writer.write(plan.applied, payload)` — tamper-evident record,
214///      emitted only AFTER step 1 succeeds, so the chain never asserts an
215///      apply that didn't actually write its routes.
216///
217/// The routes come from [`PlanResult::proposed_routes`], which `replay()`
218/// populates from the [`crate::types::PlanInput`]. Callers persisting the
219/// result must keep that field populated; a result deserialized from a
220/// legacy row (no routes) applies the status flip with an empty route set.
221///
222/// # Errors
223///
224/// - [`ApplyError::NotFound`] — no row matches `result.plan_id`.
225/// - [`ApplyError::InvalidState`] — row exists but is not in `projected`
226///   (already applied, reverted, or failed). No routes were written.
227/// - [`ApplyError::Store`] — the atomic status-flip + route-write could not
228///   complete; by contract nothing was committed, so no audit row is emitted.
229/// - [`ApplyError::Audit`] — store succeeded but audit emission failed.
230///   The plan IS applied (and routes ARE written) when this is returned;
231///   out-of-band recovery is the caller's responsibility.
232pub async fn apply_plan<S: PlanStore, A: AuditWriter>(
233    store: &S,
234    audit_writer: &A,
235    result: &PlanResult,
236    actor: Actor,
237) -> Result<(), ApplyError> {
238    let now = Utc::now();
239    let prev_status = store
240        .mark_applied(result.plan_id, now, &result.proposed_routes)
241        .await?;
242    match prev_status {
243        None => return Err(ApplyError::NotFound),
244        Some(s) if s != "projected" => {
245            return Err(ApplyError::InvalidState { state: s });
246        }
247        _ => {}
248    }
249
250    let payload = ApplyPayload {
251        plan_id: result.plan_id,
252        applied_at: now.to_rfc3339(),
253        sample_size: result.sample_size,
254        projected_savings_usd: result.aggregates.projected_savings_usd,
255    };
256    let payload_value = serde_json::to_value(&payload)
257        .map_err(|e| ApplyError::Store(format!("serialize payload: {e}")))?;
258
259    audit_writer
260        .write(
261            result.org_id,
262            actor,
263            "plan.applied".to_string(),
264            payload_value,
265        )
266        .await?;
267
268    Ok(())
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use crate::types::{Aggregates, ConfidenceIntervals, PlanResult, RouteAction, RouteConditions};
275    use chrono::Utc;
276    use tt_telemetry::audit::{verify_chain, InMemoryAuditWriter};
277
278    /// One non-trivial proposed route, so apply has something real to persist.
279    fn sample_routes() -> Vec<ProposedRoute> {
280        vec![ProposedRoute {
281            id: Uuid::now_v7(),
282            name: "haiku-for-cheap-classification".into(),
283            priority: 100,
284            enabled: true,
285            when: RouteConditions {
286                model_in: vec!["claude-sonnet-4-5".into()],
287                input_tokens_lt: Some(2_000),
288                input_tokens_gt: None,
289                tag_equals: None,
290            },
291            then: RouteAction {
292                target_model: "claude-haiku-4-5".into(),
293                force_cache_layer: None,
294                fallbacks: Vec::new(),
295            },
296        }]
297    }
298
299    fn make_plan_result(plan_id: Uuid, org_id: Uuid) -> PlanResult {
300        make_plan_result_with_routes(plan_id, org_id, sample_routes())
301    }
302
303    fn make_plan_result_with_routes(
304        plan_id: Uuid,
305        org_id: Uuid,
306        proposed_routes: Vec<ProposedRoute>,
307    ) -> PlanResult {
308        PlanResult {
309            plan_id,
310            org_id,
311            window_start: Utc::now(),
312            window_end: Utc::now(),
313            sample_size: 100,
314            aggregates: Aggregates {
315                total_baseline_cost_usd: 10.0,
316                total_projected_cost_usd: 6.0,
317                projected_savings_usd: 4.0,
318                projected_savings_pct: 40.0,
319                cache_hit_rate_projected: 0.0,
320                p50_latency_ms_projected: 100.0,
321                p95_latency_ms_projected: 250.0,
322                requests_rerouted: 50,
323                requests_unchanged: 50,
324                requests_unprice_able: 0,
325                l2_projections: Vec::new(),
326                l2_poisoning_candidates: 0,
327            },
328            confidence_intervals: ConfidenceIntervals {
329                savings_usd_95: (3.5, 4.5),
330                savings_pct_95: (35.0, 45.0),
331                cache_hit_rate_95: (0.0, 0.0),
332                p50_latency_ms_95: (90.0, 110.0),
333                p95_latency_ms_95: (200.0, 300.0),
334            },
335            per_route_breakdown: Vec::new(),
336            caveats: Vec::new(),
337            quality: None,
338            proposed_routes,
339        }
340    }
341
342    #[tokio::test]
343    async fn apply_marks_row_applied_and_emits_audit() {
344        let store = InMemoryPlanStore::new();
345        let audit = InMemoryAuditWriter::new();
346        let plan_id = store.seed_projected();
347        let org_id = Uuid::now_v7();
348        let result = make_plan_result(plan_id, org_id);
349
350        apply_plan(&store, &audit, &result, Actor::System)
351            .await
352            .expect("apply ok");
353
354        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
355        assert!(store.applied_at(plan_id).is_some());
356
357        // (a) The proposed routes were persisted alongside the status flip.
358        let written = store.applied_routes(plan_id).expect("row exists");
359        assert_eq!(written.len(), 1, "the one proposed route must be written");
360        assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
361        assert_eq!(written[0].name, "haiku-for-cheap-classification");
362
363        let entries = audit.list(org_id).await.expect("list ok");
364        assert_eq!(entries.len(), 1);
365        assert_eq!(entries[0].event, "plan.applied");
366        assert!(entries[0]
367            .payload
368            .to_string()
369            .contains(&plan_id.to_string()));
370
371        // Chain integrity.
372        let vk = audit.verifying_key();
373        verify_chain(&entries, &vk).expect("chain verifies");
374    }
375
376    #[tokio::test]
377    async fn apply_returns_not_found_for_unknown_plan() {
378        let store = InMemoryPlanStore::new();
379        let audit = InMemoryAuditWriter::new();
380        let result = make_plan_result(Uuid::now_v7(), Uuid::now_v7());
381
382        let err = apply_plan(&store, &audit, &result, Actor::System)
383            .await
384            .expect_err("unknown plan must fail");
385        assert!(matches!(err, ApplyError::NotFound));
386
387        // No audit row for a failed apply.
388        let entries = audit.list(result.org_id).await.expect("list ok");
389        assert!(entries.is_empty());
390    }
391
392    #[tokio::test]
393    async fn apply_twice_returns_invalid_state() {
394        let store = InMemoryPlanStore::new();
395        let audit = InMemoryAuditWriter::new();
396        let plan_id = store.seed_projected();
397        let org_id = Uuid::now_v7();
398        let result = make_plan_result(plan_id, org_id);
399
400        apply_plan(&store, &audit, &result, Actor::System)
401            .await
402            .expect("first apply ok");
403        let err = apply_plan(&store, &audit, &result, Actor::System)
404            .await
405            .expect_err("re-apply must fail");
406        match err {
407            ApplyError::InvalidState { state } => assert_eq!(state, "applied"),
408            other => panic!("expected InvalidState, got {other:?}"),
409        }
410
411        // Only one audit row — second attempt didn't emit.
412        let entries = audit.list(org_id).await.expect("list ok");
413        assert_eq!(entries.len(), 1);
414    }
415
416    /// (a) Applying a projected plan records BOTH `status='applied'` AND the
417    /// exact proposed routes — the core rv-plan-apply-writes-routes contract.
418    #[tokio::test]
419    async fn apply_persists_proposed_routes_atomically_with_status() {
420        let store = InMemoryPlanStore::new();
421        let audit = InMemoryAuditWriter::new();
422        let plan_id = store.seed_projected();
423        let org_id = Uuid::now_v7();
424
425        let routes = sample_routes();
426        let route_id = routes[0].id;
427        let result = make_plan_result_with_routes(plan_id, org_id, routes);
428
429        apply_plan(&store, &audit, &result, Actor::System)
430            .await
431            .expect("apply ok");
432
433        // Status flipped.
434        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
435        // Routes written, identity preserved.
436        let written = store.applied_routes(plan_id).expect("row exists");
437        assert_eq!(written.len(), 1);
438        assert_eq!(written[0].id, route_id);
439        assert_eq!(written[0].priority, 100);
440        assert_eq!(written[0].when.model_in, vec!["claude-sonnet-4-5"]);
441        assert_eq!(written[0].then.target_model, "claude-haiku-4-5");
442
443        // And the audit row was emitted (after both effects).
444        let entries = audit.list(org_id).await.expect("list ok");
445        assert_eq!(entries.len(), 1);
446        assert_eq!(entries[0].event, "plan.applied");
447    }
448
449    /// (c) When the store fails the route write, the operation is rejected
450    /// AND no audit row is emitted — the chain must never assert an apply
451    /// whose routes didn't land.
452    #[tokio::test]
453    async fn route_write_failure_emits_no_audit_row() {
454        // This store has a row in `projected` but fails the atomic write.
455        let store = InMemoryPlanStore::with_failing_route_write();
456        let audit = InMemoryAuditWriter::new();
457        // Seed a row so the failure is the route write, not a missing row.
458        let plan_id = store.seed_projected();
459        let org_id = Uuid::now_v7();
460        let result = make_plan_result(plan_id, org_id);
461
462        let err = apply_plan(&store, &audit, &result, Actor::System)
463            .await
464            .expect_err("route-write failure must surface");
465        assert!(matches!(err, ApplyError::Store(_)), "got {err:?}");
466
467        // Row untouched: still projected, no routes, no applied_at.
468        assert_eq!(store.status(plan_id).as_deref(), Some("projected"));
469        assert!(store.applied_at(plan_id).is_none());
470        assert_eq!(
471            store.applied_routes(plan_id).expect("row exists").len(),
472            0,
473            "no routes may be written when the txn fails"
474        );
475
476        // No audit row.
477        let entries = audit.list(org_id).await.expect("list ok");
478        assert!(
479            entries.is_empty(),
480            "audit row must NOT be emitted on a failed route write"
481        );
482    }
483
484    /// A legacy result (no carried routes) still applies — flips status and
485    /// writes zero routes, matching the pre-fix no-op shape rather than
486    /// erroring. Guards the `#[serde(default)]` round-trip path.
487    #[tokio::test]
488    async fn apply_with_empty_routes_flips_status_and_writes_none() {
489        let store = InMemoryPlanStore::new();
490        let audit = InMemoryAuditWriter::new();
491        let plan_id = store.seed_projected();
492        let org_id = Uuid::now_v7();
493        let result = make_plan_result_with_routes(plan_id, org_id, Vec::new());
494
495        apply_plan(&store, &audit, &result, Actor::System)
496            .await
497            .expect("apply ok even with no routes");
498
499        assert_eq!(store.status(plan_id).as_deref(), Some("applied"));
500        assert_eq!(store.applied_routes(plan_id).expect("row exists").len(), 0);
501        let entries = audit.list(org_id).await.expect("list ok");
502        assert_eq!(entries.len(), 1);
503    }
504}