Skip to main content

innate_core/storage/
evolution.rs

1use super::*;
2
3impl Storage {
4    #[allow(clippy::too_many_arguments)]
5    pub fn upsert_governance_proposal(
6        &self,
7        id: &str,
8        chunk_id: &str,
9        proposal_type: &str,
10        reason: &str,
11        evidence_count: i64,
12        evidence_score: f64,
13        actor_count: i64,
14        now: &str,
15    ) -> Result<()> {
16        self.conn.execute(
17            "INSERT INTO governance_proposals
18             (id, chunk_id, proposal_type, reason, evidence_count,
19              evidence_score, actor_count, state, created_at, updated_at)
20             VALUES (?,?,?,?,?,?,?,'pending',?,?)
21             ON CONFLICT(chunk_id, proposal_type) WHERE state='pending'
22             DO UPDATE SET reason=excluded.reason,
23                           evidence_count=excluded.evidence_count,
24                           evidence_score=excluded.evidence_score,
25                           actor_count=excluded.actor_count,
26                           updated_at=excluded.updated_at",
27            params![
28                id,
29                chunk_id,
30                proposal_type,
31                reason,
32                evidence_count,
33                evidence_score,
34                actor_count,
35                now,
36                now
37            ],
38        )?;
39        Ok(())
40    }
41
42    pub fn request_evolve(&self, id: &str, reason: &str, now: &str) -> Result<()> {
43        self.request_evolve_at(id, reason, now, None)
44    }
45
46    pub fn request_evolve_at(
47        &self,
48        id: &str,
49        reason: &str,
50        now: &str,
51        next_retry_at: Option<&str>,
52    ) -> Result<()> {
53        let priority = match reason {
54            "governance_ready" => 100,
55            "governance" => 80,
56            "threshold" => 60,
57            "distill_retry" => 50,
58            "batch_continue" => 40,
59            _ => 20,
60        };
61        self.conn.execute(
62            "INSERT INTO evolve_requests(
63               id, reason, state, requested_at, priority, next_retry_at
64             )
65             VALUES (?,?,'pending',?,?,?)
66             ON CONFLICT(reason) WHERE state='pending'
67             DO UPDATE SET
68               priority=MAX(priority, excluded.priority),
69               next_retry_at=CASE
70                 WHEN excluded.next_retry_at IS NULL THEN NULL
71                 WHEN evolve_requests.next_retry_at IS NULL THEN NULL
72                 ELSE MIN(evolve_requests.next_retry_at, excluded.next_retry_at)
73               END",
74            params![id, reason, now, priority, next_retry_at],
75        )?;
76        Ok(())
77    }
78
79    pub fn claim_evolve_request_with_reason(
80        &self,
81        now: &str,
82        stale_before: &str,
83    ) -> Result<Option<EvolveRequestClaim>> {
84        self.conn.execute(
85            "UPDATE evolve_requests
86             SET state='pending', leased_at=NULL, note='lease_recovered'
87             WHERE state='running' AND leased_at < ?",
88            [stale_before],
89        )?;
90        self.conn.execute(
91            "UPDATE evolve_requests
92             SET state='pending', leased_at=NULL, note='retry_failed'
93             WHERE state='failed' AND attempts < 3
94               AND COALESCE(next_retry_at, completed_at) < ?",
95            [now],
96        )?;
97        Ok(self
98            .conn
99            .query_row(
100                "UPDATE evolve_requests
101             SET state='running', leased_at=?, attempts=attempts+1
102             WHERE id=(
103               SELECT id FROM evolve_requests
104               WHERE state='pending'
105                 AND (next_retry_at IS NULL OR next_retry_at <= ?)
106               ORDER BY priority DESC, requested_at ASC LIMIT 1
107             ) AND state='pending'
108             RETURNING id, reason",
109                params![now, now],
110                |row| {
111                    Ok(EvolveRequestClaim {
112                        id: row.get(0)?,
113                        reason: row.get(1)?,
114                    })
115                },
116            )
117            .optional()?)
118    }
119
120    pub fn defer_evolve_request(&self, id: &str, note: &str, next_retry_at: &str) -> Result<()> {
121        self.conn.execute(
122            "DELETE FROM evolve_requests
123             WHERE state='pending'
124               AND id!=?
125               AND reason=(SELECT reason FROM evolve_requests WHERE id=?)",
126            params![id, id],
127        )?;
128        self.conn.execute(
129            "UPDATE evolve_requests
130             SET state='pending', leased_at=NULL, completed_at=NULL,
131                 note=?, next_retry_at=?
132             WHERE id=?",
133            params![note, next_retry_at, id],
134        )?;
135        Ok(())
136    }
137
138    pub fn finish_evolve_request(
139        &self,
140        id: &str,
141        state: &str,
142        note: Option<&str>,
143        now: &str,
144    ) -> Result<()> {
145        self.conn.execute(
146            "UPDATE evolve_requests
147             SET state=?, completed_at=?, note=?,
148                 last_failed_at=CASE
149                   WHEN ?='failed' THEN ?
150                   ELSE last_failed_at
151                 END,
152                 next_retry_at=CASE WHEN ?='failed'
153                   THEN strftime('%Y-%m-%dT%H:%M:%fZ', ?, '+5 minutes')
154                   ELSE NULL END
155             WHERE id=?",
156            params![state, now, note, state, now, state, now, id],
157        )?;
158        Ok(())
159    }
160
161    pub fn finish_covered_evolve_requests(&self, requested_before: &str, now: &str) -> Result<()> {
162        self.conn.execute(
163            "UPDATE evolve_requests
164             SET state='completed', completed_at=?, note='covered_by_evolve', next_retry_at=NULL
165             WHERE state='pending' AND requested_at <= ?",
166            params![now, requested_before],
167        )?;
168        Ok(())
169    }
170
171    /// Update by primary-key id (used after distill where we have the row id, not trace_id).
172    pub fn update_episodic_log_state_by_id(
173        &self,
174        id: &str,
175        state: &str,
176        note: Option<&str>,
177        outcome: Option<&str>,
178    ) -> Result<()> {
179        self.conn.execute(
180            "UPDATE episodic_log
181             SET distill_state=?, distill_note=COALESCE(?,distill_note),
182                 outcome=COALESCE(?,outcome),
183                 distill_run_id=NULL, distill_locked_at=NULL
184             WHERE id=?",
185            params![state, note, outcome, id],
186        )?;
187        Ok(())
188    }
189
190    #[allow(clippy::too_many_arguments)]
191    pub fn finish_distill_log(
192        &self,
193        id: &str,
194        state: &str,
195        note: Option<&str>,
196        prompt_tokens: i64,
197        completion_tokens: i64,
198        accounted_at: &str,
199    ) -> Result<()> {
200        self.conn.execute(
201            "INSERT INTO distill_token_usage(
202               log_id, prompt_tokens, completion_tokens, outcome, accounted_at
203             ) VALUES (?,?,?,?,?)",
204            params![id, prompt_tokens, completion_tokens, state, accounted_at],
205        )?;
206        self.conn.execute(
207            "UPDATE episodic_log
208             SET distill_state=?, distill_note=?,
209                 distill_prompt_tokens=?, distill_completion_tokens=?,
210                 distill_accounted_at=?,
211                 distill_attempts=distill_attempts
212                   + CASE WHEN ?='failed' THEN 1 ELSE 0 END,
213                 distill_last_failed_at=CASE
214                   WHEN ?='failed' THEN ?
215                   ELSE distill_last_failed_at
216                 END,
217                 distill_run_id=NULL, distill_locked_at=NULL
218             WHERE id=?",
219            params![
220                state,
221                note,
222                prompt_tokens,
223                completion_tokens,
224                accounted_at,
225                state,
226                state,
227                accounted_at,
228                id
229            ],
230        )?;
231        Ok(())
232    }
233
234    /// Claim a batch of 'new' logs for distillation: mark them 'screening' atomically.
235    /// Returns the claimed rows (with distill_run_id set to run_id).
236    pub fn claim_distill_batch(
237        &self,
238        run_id: &str,
239        limit: usize,
240        locked_at: &str,
241    ) -> Result<Vec<Value>> {
242        // BEGIN IMMEDIATE must be held by caller; this is called inside a transaction.
243        self.conn.execute(
244            "UPDATE episodic_log
245             SET distill_state='screening', distill_run_id=?, distill_locked_at=?
246             WHERE id IN (
247               SELECT id FROM episodic_log
248               WHERE distill_state='new'
249               ORDER BY priority DESC, ts ASC
250               LIMIT ?
251             )",
252            params![run_id, locked_at, limit as i64],
253        )?;
254        self.query_json(
255            "SELECT * FROM episodic_log WHERE distill_run_id=? AND distill_state='screening'",
256            params![run_id],
257        )
258    }
259
260    pub fn query_episodic_logs_open(&self, limit: usize) -> Result<Vec<Value>> {
261        self.query_json(
262            "SELECT * FROM episodic_log WHERE distill_state='new' ORDER BY priority DESC, ts ASC LIMIT ?",
263            params![limit as i64],
264        )
265    }
266}