1use super::*;
2
3impl Storage {
4 #[allow(clippy::too_many_arguments)]
5 pub fn upsert_governance_proposal(
6 &self,
7 id: &str,
8 chunk_id: &str,
9 proposal_type: &str,
10 reason: &str,
11 evidence_count: i64,
12 evidence_score: f64,
13 actor_count: i64,
14 now: &str,
15 ) -> Result<()> {
16 self.conn.execute(
17 "INSERT INTO governance_proposals
18 (id, chunk_id, proposal_type, reason, evidence_count,
19 evidence_score, actor_count, state, created_at, updated_at)
20 VALUES (?,?,?,?,?,?,?,'pending',?,?)
21 ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
22 DO UPDATE SET reason=excluded.reason,
23 evidence_count=excluded.evidence_count,
24 evidence_score=excluded.evidence_score,
25 actor_count=excluded.actor_count,
26 updated_at=excluded.updated_at",
27 params![
28 id,
29 chunk_id,
30 proposal_type,
31 reason,
32 evidence_count,
33 evidence_score,
34 actor_count,
35 now,
36 now
37 ],
38 )?;
39 Ok(())
40 }
41
42 pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
43 self.request_evolve_at(id, reason, now, None)
44 }
45
46 pub fn request_evolve_at(
47 &self,
48 id: &str,
49 reason: &str,
50 now: &str,
51 next_retry_at: Option<&str>,
52 ) -> Result<()> {
53 let priority = match reason {
54 "governance_ready" => 100,
55 "governance" => 80,
56 "threshold" => 60,
57 "distill_retry" => 50,
58 "batch_continue" => 40,
59 _ => 20,
60 };
61 self.conn.execute(
62 "INSERT INTO evolve_requests(
63 id, reason, state, requested_at, priority, next_retry_at
64 )
65 VALUES (?,?,'pending',?,?,?)
66 ON CONFLICT(reason) WHERE state='pending'
67 DO UPDATE SET
68 priority=MAX(priority, excluded.priority),
69 next_retry_at=CASE
70 WHEN excluded.next_retry_at IS NULL THEN NULL
71 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
72 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
73 END",
74 params![id, reason, now, priority, next_retry_at],
75 )?;
76 Ok(())
77 }
78
79 pub fn claim_evolve_request_with_reason(
80 &self,
81 now: &str,
82 stale_before: &str,
83 ) -> Result<Option<EvolveRequestClaim>> {
84 self.conn.execute(
85 "UPDATE evolve_requests
86 SET state='pending', leased_at=NULL, note='lease_recovered'
87 WHERE state='running' AND leased_at < ?",
88 [stale_before],
89 )?;
90 self.conn.execute(
91 "UPDATE evolve_requests
92 SET state='pending', leased_at=NULL, note='retry_failed'
93 WHERE state='failed' AND attempts < 3
94 AND COALESCE(next_retry_at, completed_at) < ?",
95 [now],
96 )?;
97 Ok(self
98 .conn
99 .query_row(
100 "UPDATE evolve_requests
101 SET state='running', leased_at=?, attempts=attempts+1
102 WHERE id=(
103 SELECT id FROM evolve_requests
104 WHERE state='pending'
105 AND (next_retry_at IS NULL OR next_retry_at <= ?)
106 ORDER BY priority DESC, requested_at ASC LIMIT 1
107 ) AND state='pending'
108 RETURNING id, reason",
109 params![now, now],
110 |row| {
111 Ok(EvolveRequestClaim {
112 id: row.get(0)?,
113 reason: row.get(1)?,
114 })
115 },
116 )
117 .optional()?)
118 }
119
120 pub fn defer_evolve_request(&self, id: &str, note: &str, next_retry_at: &str) -> Result<()> {
121 self.conn.execute(
122 "DELETE FROM evolve_requests
123 WHERE state='pending'
124 AND id!=?
125 AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
126 params![id, id],
127 )?;
128 self.conn.execute(
129 "UPDATE evolve_requests
130 SET state='pending', leased_at=NULL, completed_at=NULL,
131 note=?, next_retry_at=?
132 WHERE id=?",
133 params![note, next_retry_at, id],
134 )?;
135 Ok(())
136 }
137
138 pub fn finish_evolve_request(
139 &self,
140 id: &str,
141 state: &str,
142 note: Option<&str>,
143 now: &str,
144 ) -> Result<()> {
145 self.conn.execute(
146 "UPDATE evolve_requests
147 SET state=?, completed_at=?, note=?,
148 last_failed_at=CASE
149 WHEN ?='failed' THEN ?
150 ELSE last_failed_at
151 END,
152 next_retry_at=CASE WHEN ?='failed'
153 THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
154 ELSE NULL END
155 WHERE id=?",
156 params![state, now, note, state, now, state, now, id],
157 )?;
158 Ok(())
159 }
160
161 pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
162 self.conn.execute(
163 "UPDATE evolve_requests
164 SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
165 WHERE state='pending' AND requested_at <= ?",
166 params![now, requested_before],
167 )?;
168 Ok(())
169 }
170
171 pub fn update_episodic_log_state_by_id(
173 &self,
174 id: &str,
175 state: &str,
176 note: Option<&str>,
177 outcome: Option<&str>,
178 ) -> Result<()> {
179 self.conn.execute(
180 "UPDATE episodic_log
181 SET distill_state=?, distill_note=COALESCE(?,distill_note),
182 outcome=COALESCE(?,outcome),
183 distill_run_id=NULL, distill_locked_at=NULL
184 WHERE id=?",
185 params![state, note, outcome, id],
186 )?;
187 Ok(())
188 }
189
190 #[allow(clippy::too_many_arguments)]
191 pub fn finish_distill_log(
192 &self,
193 id: &str,
194 state: &str,
195 note: Option<&str>,
196 prompt_tokens: i64,
197 completion_tokens: i64,
198 accounted_at: &str,
199 ) -> Result<()> {
200 self.conn.execute(
201 "INSERT INTO distill_token_usage(
202 log_id, prompt_tokens, completion_tokens, outcome, accounted_at
203 ) VALUES (?,?,?,?,?)",
204 params![id, prompt_tokens, completion_tokens, state, accounted_at],
205 )?;
206 self.conn.execute(
207 "UPDATE episodic_log
208 SET distill_state=?, distill_note=?,
209 distill_prompt_tokens=?, distill_completion_tokens=?,
210 distill_accounted_at=?,
211 distill_attempts=distill_attempts
212 + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
213 distill_last_failed_at=CASE
214 WHEN ?='failed' THEN ?
215 ELSE distill_last_failed_at
216 END,
217 distill_run_id=NULL, distill_locked_at=NULL
218 WHERE id=?",
219 params![
220 state,
221 note,
222 prompt_tokens,
223 completion_tokens,
224 accounted_at,
225 state,
226 state,
227 accounted_at,
228 id
229 ],
230 )?;
231 Ok(())
232 }
233
234 pub fn claim_distill_batch(
237 &self,
238 run_id: &str,
239 limit: usize,
240 locked_at: &str,
241 ) -> Result<Vec<Value>> {
242 self.conn.execute(
244 "UPDATE episodic_log
245 SET distill_state='screening', distill_run_id=?, distill_locked_at=?
246 WHERE id IN (
247 SELECT id FROM episodic_log
248 WHERE distill_state='new'
249 ORDER BY priority DESC, ts ASC
250 LIMIT ?
251 )",
252 params![run_id, locked_at, limit as i64],
253 )?;
254 self.query_json(
255 "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
256 params![run_id],
257 )
258 }
259
260 pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
261 self.query_json(
262 "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
263 params![limit as i64],
264 )
265 }
266}