Skip to main content

innate_core/storage/
evolution.rs

1use super::*;
2
3impl Storage {
4    #[allow(clippy::too_many_arguments)]
5    pub fn upsert_governance_proposal(
6        &self,
7        id: &str,
8        chunk_id: &str,
9        proposal_type: &str,
10        reason: &str,
11        evidence_count: i64,
12        evidence_score: f64,
13        actor_count: i64,
14        now: &str,
15    ) -> Result<()> {
16        self.conn.execute(
17            "INSERT INTO governance_proposals
18             (id, chunk_id, proposal_type, reason, evidence_count,
19              evidence_score, actor_count, state, created_at, updated_at)
20             VALUES (?,?,?,?,?,?,?,'pending',?,?)
21             ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
22             DO UPDATE SET reason=excluded.reason,
23                           evidence_count=excluded.evidence_count,
24                           evidence_score=excluded.evidence_score,
25                           actor_count=excluded.actor_count,
26                           updated_at=excluded.updated_at",
27            params![
28                id,
29                chunk_id,
30                proposal_type,
31                reason,
32                evidence_count,
33                evidence_score,
34                actor_count,
35                now,
36                now
37            ],
38        )?;
39        Ok(())
40    }
41
42    /// Read-only listing of governance proposals for the web review queue,
43    /// joined with a compact chunk projection so a reviewer sees *what* was
44    /// flagged and *why* (evidence_score / actor_count) in one round-trip.
45    /// Ordered by evidence_score DESC so the strongest cases surface first.
46    pub fn list_governance_proposals(&self, state: &str, limit: usize) -> Result<Vec<Value>> {
47        let mut stmt = self.conn.prepare(
48            "SELECT g.id, g.chunk_id, g.proposal_type, g.reason,
49                    g.evidence_count, g.evidence_score, g.actor_count,
50                    g.state, g.created_at, g.updated_at,
51                    c.skill_name, c.seq, c.origin, c.state AS chunk_state,
52                    c.confidence, substr(c.content, 1, 280) AS content_preview
53             FROM governance_proposals g
54             LEFT JOIN chunks c ON c.id = g.chunk_id
55             WHERE g.state = :state
56             ORDER BY g.evidence_score DESC, g.updated_at DESC
57             LIMIT :limit",
58        )?;
59        let names: Vec<String> = stmt.column_names().into_iter().map(String::from).collect();
60        let limit_i = limit as i64;
61        let rows = stmt.query_map(
62            rusqlite::named_params! { ":state": state, ":limit": limit_i },
63            |r| row_to_json_with_names(r, &names),
64        )?;
65        let mut out = Vec::new();
66        for row in rows {
67            out.push(row?);
68        }
69        Ok(out)
70    }
71
72    pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
73        self.request_evolve_at(id, reason, now, None)
74    }
75
76    pub fn request_evolve_at(
77        &self,
78        id: &str,
79        reason: &str,
80        now: &str,
81        next_retry_at: Option<&str>,
82    ) -> Result<()> {
83        let priority = match reason {
84            "governance_ready" => 100,
85            "governance" => 80,
86            "threshold" => 60,
87            "distill_retry" => 50,
88            "batch_continue" => 40,
89            _ => 20,
90        };
91        self.conn.execute(
92            "INSERT INTO evolve_requests(
93               id, reason, state, requested_at, priority, next_retry_at
94             )
95             VALUES (?,?,'pending',?,?,?)
96             ON CONFLICT(reason) WHERE state='pending'
97             DO UPDATE SET
98               priority=MAX(priority, excluded.priority),
99               next_retry_at=CASE
100                 WHEN excluded.next_retry_at IS NULL THEN NULL
101                 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
102                 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
103               END",
104            params![id, reason, now, priority, next_retry_at],
105        )?;
106        Ok(())
107    }
108
109    pub fn claim_evolve_request_with_reason(
110        &self,
111        now: &str,
112        stale_before: &str,
113    ) -> Result<Option<EvolveRequestClaim>> {
114        self.conn.execute(
115            "UPDATE evolve_requests
116             SET state='pending', leased_at=NULL, note='lease_recovered'
117             WHERE state='running' AND leased_at < ?",
118            [stale_before],
119        )?;
120        self.conn.execute(
121            "UPDATE evolve_requests
122             SET state='pending', leased_at=NULL, note='retry_failed'
123             WHERE state='failed' AND attempts < 3
124               AND COALESCE(next_retry_at, completed_at) < ?",
125            [now],
126        )?;
127        Ok(self
128            .conn
129            .query_row(
130                "UPDATE evolve_requests
131             SET state='running', leased_at=?, attempts=attempts+1
132             WHERE id=(
133               SELECT id FROM evolve_requests
134               WHERE state='pending'
135                 AND (next_retry_at IS NULL OR next_retry_at <= ?)
136               ORDER BY priority DESC, requested_at ASC LIMIT 1
137             ) AND state='pending'
138             RETURNING id, reason",
139                params![now, now],
140                |row| {
141                    Ok(EvolveRequestClaim {
142                        id: row.get(0)?,
143                        reason: row.get(1)?,
144                    })
145                },
146            )
147            .optional()?)
148    }
149
150    pub fn defer_evolve_request(&self, id: &str, note: &str, next_retry_at: &str) -> Result<()> {
151        self.conn.execute(
152            "DELETE FROM evolve_requests
153             WHERE state='pending'
154               AND id!=?
155               AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
156            params![id, id],
157        )?;
158        self.conn.execute(
159            "UPDATE evolve_requests
160             SET state='pending', leased_at=NULL, completed_at=NULL,
161                 note=?, next_retry_at=?
162             WHERE id=?",
163            params![note, next_retry_at, id],
164        )?;
165        Ok(())
166    }
167
168    pub fn finish_evolve_request(
169        &self,
170        id: &str,
171        state: &str,
172        note: Option<&str>,
173        now: &str,
174    ) -> Result<()> {
175        self.conn.execute(
176            "UPDATE evolve_requests
177             SET state=?, completed_at=?, note=?,
178                 last_failed_at=CASE
179                   WHEN ?='failed' THEN ?
180                   ELSE last_failed_at
181                 END,
182                 next_retry_at=CASE WHEN ?='failed'
183                   THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
184                   ELSE NULL END
185             WHERE id=?",
186            params![state, now, note, state, now, state, now, id],
187        )?;
188        Ok(())
189    }
190
191    pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
192        self.conn.execute(
193            "UPDATE evolve_requests
194             SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
195             WHERE state='pending' AND requested_at <= ?",
196            params![now, requested_before],
197        )?;
198        Ok(())
199    }
200
201    /// Update by primary-key id (used after distill where we have the row id, not trace_id).
202    pub fn update_episodic_log_state_by_id(
203        &self,
204        id: &str,
205        state: &str,
206        note: Option<&str>,
207        outcome: Option<&str>,
208    ) -> Result<()> {
209        self.conn.execute(
210            "UPDATE episodic_log
211             SET distill_state=?, distill_note=COALESCE(?,distill_note),
212                 outcome=COALESCE(?,outcome),
213                 distill_run_id=NULL, distill_locked_at=NULL
214             WHERE id=?",
215            params![state, note, outcome, id],
216        )?;
217        Ok(())
218    }
219
220    #[allow(clippy::too_many_arguments)]
221    pub fn finish_distill_log(
222        &self,
223        id: &str,
224        state: &str,
225        note: Option<&str>,
226        prompt_tokens: i64,
227        completion_tokens: i64,
228        accounted_at: &str,
229    ) -> Result<()> {
230        self.conn.execute(
231            "INSERT INTO distill_token_usage(
232               log_id, prompt_tokens, completion_tokens, outcome, accounted_at
233             ) VALUES (?,?,?,?,?)",
234            params![id, prompt_tokens, completion_tokens, state, accounted_at],
235        )?;
236        self.conn.execute(
237            "UPDATE episodic_log
238             SET distill_state=?, distill_note=?,
239                 distill_prompt_tokens=?, distill_completion_tokens=?,
240                 distill_accounted_at=?,
241                 distill_attempts=distill_attempts
242                   + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
243                 distill_last_failed_at=CASE
244                   WHEN ?='failed' THEN ?
245                   ELSE distill_last_failed_at
246                 END,
247                 distill_run_id=NULL, distill_locked_at=NULL
248             WHERE id=?",
249            params![
250                state,
251                note,
252                prompt_tokens,
253                completion_tokens,
254                accounted_at,
255                state,
256                state,
257                accounted_at,
258                id
259            ],
260        )?;
261        Ok(())
262    }
263
264    /// Claim a batch of 'new' logs for distillation: mark them 'screening' atomically.
265    /// Returns the claimed rows (with distill_run_id set to run_id).
266    pub fn claim_distill_batch(
267        &self,
268        run_id: &str,
269        limit: usize,
270        locked_at: &str,
271    ) -> Result<Vec<Value>> {
272        // BEGIN IMMEDIATE must be held by caller; this is called inside a transaction.
273        self.conn.execute(
274            "UPDATE episodic_log
275             SET distill_state='screening', distill_run_id=?, distill_locked_at=?
276             WHERE id IN (
277               SELECT id FROM episodic_log
278               WHERE distill_state='new'
279               ORDER BY priority DESC, ts ASC
280               LIMIT ?
281             )",
282            params![run_id, locked_at, limit as i64],
283        )?;
284        self.query_json(
285            "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
286            params![run_id],
287        )
288    }
289
290    pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
291        self.query_json(
292            "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
293            params![limit as i64],
294        )
295    }
296}