1use super::*;
2
3impl Storage {
4 #[allow(clippy::too_many_arguments)]
5 pub fn upsert_governance_proposal(
6 &self,
7 id: &str,
8 chunk_id: &str,
9 proposal_type: &str,
10 reason: &str,
11 evidence_count: i64,
12 evidence_score: f64,
13 actor_count: i64,
14 now: &str,
15 ) -> Result<()> {
16 self.conn.execute(
17 "INSERT INTO governance_proposals
18 (id, chunk_id, proposal_type, reason, evidence_count,
19 evidence_score, actor_count, state, created_at, updated_at)
20 VALUES (?,?,?,?,?,?,?,'pending',?,?)
21 ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
22 DO UPDATE SET reason=excluded.reason,
23 evidence_count=excluded.evidence_count,
24 evidence_score=excluded.evidence_score,
25 actor_count=excluded.actor_count,
26 updated_at=excluded.updated_at",
27 params![
28 id,
29 chunk_id,
30 proposal_type,
31 reason,
32 evidence_count,
33 evidence_score,
34 actor_count,
35 now,
36 now
37 ],
38 )?;
39 Ok(())
40 }
41
42 pub fn list_governance_proposals(&self, state: &str, limit: usize) -> Result<Vec<Value>> {
47 let mut stmt = self.conn.prepare(
48 "SELECT g.id, g.chunk_id, g.proposal_type, g.reason,
49 g.evidence_count, g.evidence_score, g.actor_count,
50 g.state, g.created_at, g.updated_at,
51 c.skill_name, c.seq, c.origin, c.state AS chunk_state,
52 c.confidence, substr(c.content, 1, 280) AS content_preview
53 FROM governance_proposals g
54 LEFT JOIN chunks c ON c.id = g.chunk_id
55 WHERE g.state = :state
56 ORDER BY g.evidence_score DESC, g.updated_at DESC
57 LIMIT :limit",
58 )?;
59 let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
60 let limit_i = limit as i64;
61 let rows = stmt.query_map(
62 rusqlite::named_params! { ":state": state, ":limit": limit_i },
63 |r| row_to_json_with_names(r, &names),
64 )?;
65 let mut out = Vec::new();
66 for row in rows {
67 out.push(row?);
68 }
69 Ok(out)
70 }
71
72 pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
73 self.request_evolve_at(id, reason, now, None)
74 }
75
76 pub fn request_evolve_at(
77 &self,
78 id: &str,
79 reason: &str,
80 now: &str,
81 next_retry_at: Option<&str>,
82 ) -> Result<()> {
83 let priority = match reason {
84 "governance_ready" => 100,
85 "governance" => 80,
86 "threshold" => 60,
87 "distill_retry" => 50,
88 "batch_continue" => 40,
89 _ => 20,
90 };
91 self.conn.execute(
92 "INSERT INTO evolve_requests(
93 id, reason, state, requested_at, priority, next_retry_at
94 )
95 VALUES (?,?,'pending',?,?,?)
96 ON CONFLICT(reason) WHERE state='pending'
97 DO UPDATE SET
98 priority=MAX(priority, excluded.priority),
99 next_retry_at=CASE
100 WHEN excluded.next_retry_at IS NULL THEN NULL
101 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
102 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
103 END",
104 params![id, reason, now, priority, next_retry_at],
105 )?;
106 Ok(())
107 }
108
109 pub fn claim_evolve_request_with_reason(
110 &self,
111 now: &str,
112 stale_before: &str,
113 ) -> Result<Option<EvolveRequestClaim>> {
114 self.conn.execute(
115 "UPDATE evolve_requests
116 SET state='pending', leased_at=NULL, note='lease_recovered'
117 WHERE state='running' AND leased_at < ?",
118 [stale_before],
119 )?;
120 self.conn.execute(
121 "UPDATE evolve_requests
122 SET state='pending', leased_at=NULL, note='retry_failed'
123 WHERE state='failed' AND attempts < 3
124 AND COALESCE(next_retry_at, completed_at) < ?",
125 [now],
126 )?;
127 Ok(self
128 .conn
129 .query_row(
130 "UPDATE evolve_requests
131 SET state='running', leased_at=?, attempts=attempts+1
132 WHERE id=(
133 SELECT id FROM evolve_requests
134 WHERE state='pending'
135 AND (next_retry_at IS NULL OR next_retry_at <= ?)
136 ORDER BY priority DESC, requested_at ASC LIMIT 1
137 ) AND state='pending'
138 RETURNING id, reason",
139 params![now, now],
140 |row| {
141 Ok(EvolveRequestClaim {
142 id: row.get(0)?,
143 reason: row.get(1)?,
144 })
145 },
146 )
147 .optional()?)
148 }
149
150 pub fn defer_evolve_request(&self, id: &str, note: &str, next_retry_at: &str) -> Result<()> {
151 self.conn.execute(
152 "DELETE FROM evolve_requests
153 WHERE state='pending'
154 AND id!=?
155 AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
156 params![id, id],
157 )?;
158 self.conn.execute(
159 "UPDATE evolve_requests
160 SET state='pending', leased_at=NULL, completed_at=NULL,
161 note=?, next_retry_at=?
162 WHERE id=?",
163 params![note, next_retry_at, id],
164 )?;
165 Ok(())
166 }
167
168 pub fn finish_evolve_request(
169 &self,
170 id: &str,
171 state: &str,
172 note: Option<&str>,
173 now: &str,
174 ) -> Result<()> {
175 self.conn.execute(
176 "UPDATE evolve_requests
177 SET state=?, completed_at=?, note=?,
178 last_failed_at=CASE
179 WHEN ?='failed' THEN ?
180 ELSE last_failed_at
181 END,
182 next_retry_at=CASE WHEN ?='failed'
183 THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
184 ELSE NULL END
185 WHERE id=?",
186 params![state, now, note, state, now, state, now, id],
187 )?;
188 Ok(())
189 }
190
191 pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
192 self.conn.execute(
193 "UPDATE evolve_requests
194 SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
195 WHERE state='pending' AND requested_at <= ?",
196 params![now, requested_before],
197 )?;
198 Ok(())
199 }
200
201 pub fn update_episodic_log_state_by_id(
203 &self,
204 id: &str,
205 state: &str,
206 note: Option<&str>,
207 outcome: Option<&str>,
208 ) -> Result<()> {
209 self.conn.execute(
210 "UPDATE episodic_log
211 SET distill_state=?, distill_note=COALESCE(?,distill_note),
212 outcome=COALESCE(?,outcome),
213 distill_run_id=NULL, distill_locked_at=NULL
214 WHERE id=?",
215 params![state, note, outcome, id],
216 )?;
217 Ok(())
218 }
219
220 #[allow(clippy::too_many_arguments)]
221 pub fn finish_distill_log(
222 &self,
223 id: &str,
224 state: &str,
225 note: Option<&str>,
226 prompt_tokens: i64,
227 completion_tokens: i64,
228 accounted_at: &str,
229 ) -> Result<()> {
230 self.conn.execute(
231 "INSERT INTO distill_token_usage(
232 log_id, prompt_tokens, completion_tokens, outcome, accounted_at
233 ) VALUES (?,?,?,?,?)",
234 params![id, prompt_tokens, completion_tokens, state, accounted_at],
235 )?;
236 self.conn.execute(
237 "UPDATE episodic_log
238 SET distill_state=?, distill_note=?,
239 distill_prompt_tokens=?, distill_completion_tokens=?,
240 distill_accounted_at=?,
241 distill_attempts=distill_attempts
242 + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
243 distill_last_failed_at=CASE
244 WHEN ?='failed' THEN ?
245 ELSE distill_last_failed_at
246 END,
247 distill_run_id=NULL, distill_locked_at=NULL
248 WHERE id=?",
249 params![
250 state,
251 note,
252 prompt_tokens,
253 completion_tokens,
254 accounted_at,
255 state,
256 state,
257 accounted_at,
258 id
259 ],
260 )?;
261 Ok(())
262 }
263
264 pub fn claim_distill_batch(
267 &self,
268 run_id: &str,
269 limit: usize,
270 locked_at: &str,
271 ) -> Result<Vec<Value>> {
272 self.conn.execute(
274 "UPDATE episodic_log
275 SET distill_state='screening', distill_run_id=?, distill_locked_at=?
276 WHERE id IN (
277 SELECT id FROM episodic_log
278 WHERE distill_state='new'
279 ORDER BY priority DESC, ts ASC
280 LIMIT ?
281 )",
282 params![run_id, locked_at, limit as i64],
283 )?;
284 self.query_json(
285 "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
286 params![run_id],
287 )
288 }
289
290 pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
291 self.query_json(
292 "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
293 params![limit as i64],
294 )
295 }
296}