zeph_core/goal/
accounting.rs1use std::sync::Arc;
12
13use parking_lot::Mutex;
14
15use super::{Goal, GoalSnapshot, GoalStatus, GoalStore, store::GoalError};
16
17struct CachedGoal {
19 id: String,
20 text: String,
21 status: GoalStatus,
22 token_budget: Option<u64>,
23}
24
25pub struct GoalAccounting {
36 store: Arc<GoalStore>,
37 cached: Mutex<Option<CachedGoal>>,
38}
39
40impl GoalAccounting {
41 #[must_use]
43 pub fn new(store: Arc<GoalStore>) -> Self {
44 Self {
45 store,
46 cached: Mutex::new(None),
47 }
48 }
49
50 pub async fn refresh(&self) -> Result<(), GoalError> {
59 let active = self.store.active().await?;
60 let mut guard = self.cached.lock();
61 *guard = active.map(|g| CachedGoal {
62 id: g.id,
63 text: g.text,
64 status: g.status,
65 token_budget: g.token_budget.map(|b| b.max(0).cast_unsigned()),
66 });
67 Ok(())
68 }
69
70 pub fn snapshot(&self) -> Option<GoalSnapshot> {
75 let guard = self.cached.lock();
76 let cached = guard.as_ref()?;
77 if cached.status != GoalStatus::Active {
78 return None;
79 }
80 Some(GoalSnapshot {
81 id: cached.id.clone(),
82 text: cached.text.clone(),
83 status: cached.status,
84 turns_used: 0,
85 tokens_used: 0,
86 token_budget: cached.token_budget,
87 })
88 }
89
90 pub fn on_turn_complete(
100 &self,
101 turn_tokens: u64,
102 spawn_bg: impl FnOnce(std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>),
103 ) {
104 let goal_id = {
105 let guard = self.cached.lock();
106 let Some(cached) = guard.as_ref() else { return };
107 if cached.status != GoalStatus::Active {
108 return;
109 }
110 cached.id.clone()
111 };
112
113 let store = Arc::clone(&self.store);
114
115 spawn_bg(Box::pin(async move {
116 match store.record_turn(&goal_id, turn_tokens).await {
117 Ok(updated) => {
118 tracing::debug!(
119 goal_id = %goal_id,
120 turns_used = updated.turns_used,
121 tokens_used = updated.tokens_used,
122 "goal accounting: turn recorded"
123 );
124 if let (Some(budget), tokens_used) = (
126 updated.token_budget,
127 updated.tokens_used.max(0).cast_unsigned(),
128 ) {
129 let budget = budget.max(0).cast_unsigned();
130 if tokens_used >= budget {
131 tracing::warn!(
132 goal_id = %goal_id,
133 tokens_used,
134 budget,
135 "goal token budget exhausted — auto-pausing"
136 );
137 match store
138 .transition(&goal_id, GoalStatus::Paused, updated.updated_at)
139 .await
140 {
141 Ok(_) => {}
142 Err(GoalError::StaleUpdate(_)) => {
143 tracing::warn!(
144 goal_id = %goal_id,
145 "goal auto-pause skipped: concurrent modification (stale update)"
146 );
147 }
148 Err(e) => {
149 tracing::warn!(goal_id = %goal_id, error = %e, "goal auto-pause failed");
150 }
151 }
152 }
153 }
154 }
155 Err(e) => {
156 tracing::warn!(goal_id = %goal_id, error = %e, "goal accounting: record_turn failed");
157 }
158 }
159 }));
160 }
161
162 pub async fn get_active(&self) -> Result<Option<Goal>, GoalError> {
170 self.store.active().await
171 }
172
173 #[must_use]
175 pub fn get_store(&self) -> Arc<GoalStore> {
176 Arc::clone(&self.store)
177 }
178}
179
180#[cfg(all(test, feature = "sqlite", not(feature = "postgres")))]
181mod tests {
182 use std::sync::Arc;
183
184 use super::*;
185
186 async fn make_store() -> Arc<GoalStore> {
187 let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
188 sqlx::query(
189 "CREATE TABLE zeph_goals (\
190 id TEXT PRIMARY KEY, text TEXT NOT NULL, \
191 status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active','paused','completed','cleared')), \
192 token_budget INTEGER, turns_used INTEGER NOT NULL DEFAULT 0, \
193 tokens_used INTEGER NOT NULL DEFAULT 0, \
194 created_at TEXT NOT NULL, updated_at TEXT NOT NULL, completed_at TEXT)",
195 )
196 .execute(&pool)
197 .await
198 .unwrap();
199 sqlx::query(
200 "CREATE UNIQUE INDEX idx_zeph_goals_single_active ON zeph_goals(status) WHERE status = 'active'",
201 )
202 .execute(&pool)
203 .await
204 .unwrap();
205 Arc::new(GoalStore::new(Arc::new(pool)))
206 }
207
208 #[tokio::test]
209 async fn snapshot_returns_none_when_no_active_goal() {
210 let store = make_store().await;
211 let accounting = GoalAccounting::new(store);
212 assert!(accounting.snapshot().is_none());
213 }
214
215 #[tokio::test]
216 async fn refresh_populates_cache_from_db() {
217 let store = make_store().await;
218 store.create("buy groceries", None, 400).await.unwrap();
219
220 let accounting = GoalAccounting::new(Arc::clone(&store));
221 assert!(accounting.snapshot().is_none(), "cache starts empty");
222
223 accounting.refresh().await.unwrap();
224 let snap = accounting.snapshot().expect("snapshot after refresh");
225 assert_eq!(snap.text, "buy groceries");
226 assert_eq!(snap.status, GoalStatus::Active);
227 }
228
229 #[tokio::test]
230 async fn snapshot_returns_none_for_paused_goal() {
231 let store = make_store().await;
232 let goal = store.create("do thing", None, 400).await.unwrap();
233 store
234 .transition(&goal.id, GoalStatus::Paused, goal.updated_at)
235 .await
236 .unwrap();
237
238 let accounting = GoalAccounting::new(Arc::clone(&store));
239 accounting.refresh().await.unwrap();
240 assert!(accounting.snapshot().is_none());
242 }
243
244 #[tokio::test]
245 async fn on_turn_complete_is_noop_when_cache_empty() {
246 let store = make_store().await;
247 let accounting = GoalAccounting::new(store);
248 let mut called = false;
249 accounting.on_turn_complete(100, |_fut| {
250 called = true;
251 });
252 assert!(!called, "spawn_bg must not be called when no active goal");
253 }
254
255 #[tokio::test]
256 async fn on_turn_complete_spawns_background_task() {
257 let store = make_store().await;
258 store.create("active goal", None, 400).await.unwrap();
259
260 let accounting = GoalAccounting::new(Arc::clone(&store));
261 accounting.refresh().await.unwrap();
262
263 let mut fut_received: Option<
264 std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
265 > = None;
266 accounting.on_turn_complete(500, |fut| {
267 fut_received = Some(fut);
268 });
269 assert!(
270 fut_received.is_some(),
271 "spawn_bg must be called when active goal exists"
272 );
273
274 fut_received.unwrap().await;
276
277 let goals = store.list(10).await.unwrap();
279 let active = goals
280 .iter()
281 .find(|g| g.status == GoalStatus::Active)
282 .unwrap();
283 assert_eq!(active.tokens_used, 500);
284 assert_eq!(active.turns_used, 1);
285 }
286
287 #[tokio::test]
288 async fn auto_pause_on_budget_exhaustion() {
289 let store = make_store().await;
290 store.create("budget goal", Some(100), 400).await.unwrap();
292
293 let accounting = GoalAccounting::new(Arc::clone(&store));
294 accounting.refresh().await.unwrap();
295
296 let mut fut_received = None;
297 accounting.on_turn_complete(200, |fut| {
298 fut_received = Some(fut);
299 });
300 fut_received.unwrap().await;
301
302 let goals = store.list(10).await.unwrap();
304 let goal = goals.first().unwrap();
305 assert_eq!(
306 goal.status,
307 GoalStatus::Paused,
308 "goal must be auto-paused when budget exhausted"
309 );
310 }
311}