Skip to main content

zeph_core/goal/
accounting.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Per-turn goal token accounting (G4).
5//!
6//! `GoalAccounting` is the bridge between the agent loop and `GoalStore`. It caches
7//! the active goal ID in memory to avoid a round-trip on every turn when no goal is
8//! active, and dispatches `record_turn` as a fire-and-forget background task tracked
9//! by the agent supervisor.
10
11use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use super::{Goal, GoalSnapshot, GoalStatus, GoalStore, store::GoalError};
16
17/// Cached state for the current active goal.
18struct CachedGoal {
19    id: String,
20    text: String,
21    status: GoalStatus,
22    token_budget: Option<u64>,
23}
24
25/// Per-turn token accounting service for the active long-horizon goal.
26///
27/// Wraps `GoalStore` with an in-memory cache of the active goal ID so that
28/// turns without an active goal incur no database round-trips.
29///
30/// # Invariant (G4)
31///
32/// `on_turn_complete` is fire-and-forget. A DB write failure logs `WARN` and never
33/// aborts the turn. Budget exhaustion auto-pauses the goal via a best-effort
34/// background task.
35pub struct GoalAccounting {
36    store: Arc<GoalStore>,
37    cached: Mutex<Option<CachedGoal>>,
38}
39
40impl GoalAccounting {
41    /// Create a new `GoalAccounting` backed by `store`.
42    #[must_use]
43    pub fn new(store: Arc<GoalStore>) -> Self {
44        Self {
45            store,
46            cached: Mutex::new(None),
47        }
48    }
49
50    /// Refresh the in-memory cache from the database.
51    ///
52    /// Call this after every `/goal` command to ensure the cache reflects the
53    /// latest state before the next turn.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`GoalError`] if the database query fails.
58    pub async fn refresh(&self) -> Result<(), GoalError> {
59        let active = self.store.active().await?;
60        let mut guard = self.cached.lock();
61        *guard = active.map(|g| CachedGoal {
62            id: g.id,
63            text: g.text,
64            status: g.status,
65            token_budget: g.token_budget.map(|b| b.max(0).cast_unsigned()),
66        });
67        Ok(())
68    }
69
70    /// Build a lightweight snapshot of the cached active goal, if any.
71    ///
72    /// Returns `None` when no goal is active (the goal was cleared, completed,
73    /// paused, or never created).
74    pub fn snapshot(&self) -> Option<GoalSnapshot> {
75        let guard = self.cached.lock();
76        let cached = guard.as_ref()?;
77        if cached.status != GoalStatus::Active {
78            return None;
79        }
80        Some(GoalSnapshot {
81            id: cached.id.clone(),
82            text: cached.text.clone(),
83            status: cached.status,
84            turns_used: 0,
85            tokens_used: 0,
86            token_budget: cached.token_budget,
87        })
88    }
89
90    /// Notify the accounting service that a turn completed, consuming `turn_tokens` tokens.
91    ///
92    /// If no active goal is cached, this is a cheap no-op (no DB access).
93    ///
94    /// When a token budget is set and exceeded, the goal is auto-paused via a
95    /// best-effort background task.
96    ///
97    /// Background tasks are spawned via the provided closure so the caller controls
98    /// how they are tracked (typically via the agent supervisor).
99    pub fn on_turn_complete(
100        &self,
101        turn_tokens: u64,
102        spawn_bg: impl FnOnce(std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>),
103    ) {
104        let goal_id = {
105            let guard = self.cached.lock();
106            let Some(cached) = guard.as_ref() else { return };
107            if cached.status != GoalStatus::Active {
108                return;
109            }
110            cached.id.clone()
111        };
112
113        let store = Arc::clone(&self.store);
114
115        spawn_bg(Box::pin(async move {
116            match store.record_turn(&goal_id, turn_tokens).await {
117                Ok(updated) => {
118                    tracing::debug!(
119                        goal_id = %goal_id,
120                        turns_used = updated.turns_used,
121                        tokens_used = updated.tokens_used,
122                        "goal accounting: turn recorded"
123                    );
124                    // Auto-pause when token budget exceeded.
125                    if let (Some(budget), tokens_used) = (
126                        updated.token_budget,
127                        updated.tokens_used.max(0).cast_unsigned(),
128                    ) {
129                        let budget = budget.max(0).cast_unsigned();
130                        if tokens_used >= budget {
131                            tracing::warn!(
132                                goal_id = %goal_id,
133                                tokens_used,
134                                budget,
135                                "goal token budget exhausted — auto-pausing"
136                            );
137                            match store
138                                .transition(&goal_id, GoalStatus::Paused, updated.updated_at)
139                                .await
140                            {
141                                Ok(_) => {}
142                                Err(GoalError::StaleUpdate(_)) => {
143                                    tracing::warn!(
144                                        goal_id = %goal_id,
145                                        "goal auto-pause skipped: concurrent modification (stale update)"
146                                    );
147                                }
148                                Err(e) => {
149                                    tracing::warn!(goal_id = %goal_id, error = %e, "goal auto-pause failed");
150                                }
151                            }
152                        }
153                    }
154                }
155                Err(e) => {
156                    tracing::warn!(goal_id = %goal_id, error = %e, "goal accounting: record_turn failed");
157                }
158            }
159        }));
160    }
161
162    /// Return a full `Goal` from the store for a given id.
163    ///
164    /// Used by the `/goal status` handler to show live details.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`GoalError`] if the database query fails.
169    pub async fn get_active(&self) -> Result<Option<Goal>, GoalError> {
170        self.store.active().await
171    }
172
173    /// Return a reference to the underlying store for direct queries.
174    #[must_use]
175    pub fn get_store(&self) -> Arc<GoalStore> {
176        Arc::clone(&self.store)
177    }
178}
179
180#[cfg(all(test, feature = "sqlite", not(feature = "postgres")))]
181mod tests {
182    use std::sync::Arc;
183
184    use super::*;
185
186    async fn make_store() -> Arc<GoalStore> {
187        let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
188        sqlx::query(
189            "CREATE TABLE zeph_goals (\
190             id TEXT PRIMARY KEY, text TEXT NOT NULL, \
191             status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active','paused','completed','cleared')), \
192             token_budget INTEGER, turns_used INTEGER NOT NULL DEFAULT 0, \
193             tokens_used INTEGER NOT NULL DEFAULT 0, \
194             created_at TEXT NOT NULL, updated_at TEXT NOT NULL, completed_at TEXT)",
195        )
196        .execute(&pool)
197        .await
198        .unwrap();
199        sqlx::query(
200            "CREATE UNIQUE INDEX idx_zeph_goals_single_active ON zeph_goals(status) WHERE status = 'active'",
201        )
202        .execute(&pool)
203        .await
204        .unwrap();
205        Arc::new(GoalStore::new(Arc::new(pool)))
206    }
207
208    #[tokio::test]
209    async fn snapshot_returns_none_when_no_active_goal() {
210        let store = make_store().await;
211        let accounting = GoalAccounting::new(store);
212        assert!(accounting.snapshot().is_none());
213    }
214
215    #[tokio::test]
216    async fn refresh_populates_cache_from_db() {
217        let store = make_store().await;
218        store.create("buy groceries", None, 400).await.unwrap();
219
220        let accounting = GoalAccounting::new(Arc::clone(&store));
221        assert!(accounting.snapshot().is_none(), "cache starts empty");
222
223        accounting.refresh().await.unwrap();
224        let snap = accounting.snapshot().expect("snapshot after refresh");
225        assert_eq!(snap.text, "buy groceries");
226        assert_eq!(snap.status, GoalStatus::Active);
227    }
228
229    #[tokio::test]
230    async fn snapshot_returns_none_for_paused_goal() {
231        let store = make_store().await;
232        let goal = store.create("do thing", None, 400).await.unwrap();
233        store
234            .transition(&goal.id, GoalStatus::Paused, goal.updated_at)
235            .await
236            .unwrap();
237
238        let accounting = GoalAccounting::new(Arc::clone(&store));
239        accounting.refresh().await.unwrap();
240        // Paused goal should not appear in snapshot.
241        assert!(accounting.snapshot().is_none());
242    }
243
244    #[tokio::test]
245    async fn on_turn_complete_is_noop_when_cache_empty() {
246        let store = make_store().await;
247        let accounting = GoalAccounting::new(store);
248        let mut called = false;
249        accounting.on_turn_complete(100, |_fut| {
250            called = true;
251        });
252        assert!(!called, "spawn_bg must not be called when no active goal");
253    }
254
255    #[tokio::test]
256    async fn on_turn_complete_spawns_background_task() {
257        let store = make_store().await;
258        store.create("active goal", None, 400).await.unwrap();
259
260        let accounting = GoalAccounting::new(Arc::clone(&store));
261        accounting.refresh().await.unwrap();
262
263        let mut fut_received: Option<
264            std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
265        > = None;
266        accounting.on_turn_complete(500, |fut| {
267            fut_received = Some(fut);
268        });
269        assert!(
270            fut_received.is_some(),
271            "spawn_bg must be called when active goal exists"
272        );
273
274        // Drive the future to completion.
275        fut_received.unwrap().await;
276
277        // Verify tokens were recorded.
278        let goals = store.list(10).await.unwrap();
279        let active = goals
280            .iter()
281            .find(|g| g.status == GoalStatus::Active)
282            .unwrap();
283        assert_eq!(active.tokens_used, 500);
284        assert_eq!(active.turns_used, 1);
285    }
286
287    #[tokio::test]
288    async fn auto_pause_on_budget_exhaustion() {
289        let store = make_store().await;
290        // Budget of 100 tokens; turn consumes 200.
291        store.create("budget goal", Some(100), 400).await.unwrap();
292
293        let accounting = GoalAccounting::new(Arc::clone(&store));
294        accounting.refresh().await.unwrap();
295
296        let mut fut_received = None;
297        accounting.on_turn_complete(200, |fut| {
298            fut_received = Some(fut);
299        });
300        fut_received.unwrap().await;
301
302        // Goal should now be paused.
303        let goals = store.list(10).await.unwrap();
304        let goal = goals.first().unwrap();
305        assert_eq!(
306            goal.status,
307            GoalStatus::Paused,
308            "goal must be auto-paused when budget exhausted"
309        );
310    }
311}