Skip to main content

zeph_memory/
reasoning.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `ReasoningBank`: distilled reasoning strategy memory (#3342).
5//!
6//! After each completed agent turn a three-stage async pipeline runs off the hot path:
7//!
8//! 1. **Self-judge** ([`run_self_judge`]) — a fast LLM evaluates success/failure and
9//!    extracts the key reasoning steps.
10//! 2. **Distillation** ([`distill_strategy`]) — a strategy summary (≤ 3 sentences) is
11//!    generated from the reasoning chain, capturing the transferable principle.
12//! 3. **Storage** ([`ReasoningMemory::insert`]) — the summary is written to `SQLite`
13//!    and, when Qdrant is available, embedded and indexed for vector retrieval.
14//!
15//! At context-build time [`ReasoningMemory::retrieve_by_embedding`] fetches top-k
16//! strategies by embedding similarity. The caller (in `zeph-context`) calls
17//! [`ReasoningMemory::mark_used`] only for strategies actually injected into the prompt,
18//! after budget truncation (C4 split from architect plan).
19//!
20//! # LRU eviction
21//!
22//! [`ReasoningMemory::evict_lru`] protects rows with `use_count > HOT_STRATEGY_USE_COUNT`
23//! (default 10) from normal eviction. When all rows are hot and the table exceeds
24//! `2 × store_limit`, a forced eviction pass deletes the oldest rows unconditionally
25//! and emits a `warn!` so operators can tune `store_limit` upward.
26//!
27//! # LRU eviction race note
28//!
29//! Two concurrent turns may race on the count check in `evict_lru`. Either both evict
30//! (over-eviction by at most `top_k` rows) or neither. This is acceptable for MVP —
31//! the table remains bounded.
32
33use std::str::FromStr;
34use std::time::Duration;
35
36use serde::Deserialize;
37use tokio::time::timeout;
38use zeph_db::{ActiveDialect, DbPool, placeholder_list};
39use zeph_llm::any::AnyProvider;
40use zeph_llm::provider::{LlmProvider as _, Message, Role};
41
42use crate::error::MemoryError;
43use crate::vector_store::VectorStore;
44
45/// Minimum retrieval count to protect a strategy from normal LRU eviction.
46///
47/// Strategies with `use_count > HOT_STRATEGY_USE_COUNT` are skipped during normal
48/// cold-eviction and only removed when the table exceeds `2 × store_limit`.
49const HOT_STRATEGY_USE_COUNT: i64 = 10;
50
51/// Maximum ids per `SQLite` `WHERE id IN (...)` bind list (`SQLite` variable limit is 999).
52const MAX_IDS_PER_QUERY: usize = 490;
53
54/// System prompt for the self-judge LLM step.
55///
56/// Instructs the LLM to evaluate success/failure and extract the reasoning chain
57/// as structured JSON matching [`SelfJudgeOutcome`].
58const SELF_JUDGE_SYSTEM: &str = "\
59You are a task outcome evaluator. Given an agent turn transcript, analyze the conversation and determine:
601. Did the agent successfully complete the user's request? (true/false)
612. Extract the key reasoning steps the agent took (reasoning chain).
623. Summarize the task in one sentence (task hint).
63
64Respond ONLY with valid JSON, no markdown fences, no prose:
65{\"success\": bool, \"reasoning_chain\": \"string\", \"task_hint\": \"string\"}";
66
67/// System prompt for the distillation LLM step.
68///
69/// Instructs the LLM to compress a reasoning chain into a short, generalizable strategy.
70const DISTILL_SYSTEM: &str = "\
71You are a strategy distiller. Given a reasoning chain from an agent turn, distill it into \
72a short generalizable strategy (at most 3 sentences) that could help an agent facing a similar \
73task. Focus on the transferable principle, not the specific instance. \
74Respond with the strategy text only — no headers, no lists, no markdown.";
75
76/// Outcome of a reasoning strategy: whether the agent succeeded or failed.
77///
78/// Stored as a `TEXT NOT NULL` column (`"success"` or `"failure"`).
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum Outcome {
81    /// The agent successfully completed the task.
82    Success,
83    /// The agent failed to complete the task.
84    Failure,
85}
86
87impl Outcome {
88    /// Returns the canonical string representation stored in the database.
89    #[must_use]
90    pub fn as_str(self) -> &'static str {
91        match self {
92            Outcome::Success => "success",
93            Outcome::Failure => "failure",
94        }
95    }
96}
97
98/// Error returned when parsing an [`Outcome`] from a string fails.
99#[derive(Debug, thiserror::Error)]
100#[error("unknown outcome: {0}")]
101pub struct OutcomeParseError(String);
102
103impl FromStr for Outcome {
104    type Err = OutcomeParseError;
105
106    fn from_str(s: &str) -> Result<Self, Self::Err> {
107        match s {
108            "success" => Ok(Outcome::Success),
109            "failure" => Ok(Outcome::Failure),
110            other => {
111                tracing::warn!(
112                    value = other,
113                    "reasoning: unknown outcome, defaulting to Failure"
114                );
115                Ok(Outcome::Failure)
116            }
117        }
118    }
119}
120
121/// A distilled reasoning strategy row from the `reasoning_strategies` table.
122///
123/// Constructed after a successful self-judge + distillation pipeline run.
124/// Persisted in `SQLite` and (when Qdrant is available) indexed as a vector embedding.
125#[derive(Debug, Clone)]
126pub struct ReasoningStrategy {
127    /// UUID v4 primary key.
128    pub id: String,
129    /// Distilled strategy summary (≤ 3 sentences, ≤ 512 chars).
130    pub summary: String,
131    /// Whether the agent succeeded or failed on the source turn.
132    pub outcome: Outcome,
133    /// One-sentence description of the task that produced this strategy.
134    pub task_hint: String,
135    /// Unix timestamp (seconds) when this strategy was created.
136    pub created_at: i64,
137    /// Unix timestamp (seconds) of the last retrieval.
138    pub last_used_at: i64,
139    /// Number of times this strategy has been injected into context.
140    pub use_count: i64,
141    /// Unix timestamp (seconds) when the Qdrant embedding was created.
142    ///
143    /// `None` means this row has not been embedded yet (Qdrant was unavailable at insert time).
144    pub embedded_at: Option<i64>,
145}
146
147/// Parsed response from the self-judge LLM call.
148///
149/// Deserialized from the LLM JSON response in [`run_self_judge`].
150/// The `success` field drives [`Outcome`] selection; `reasoning_chain` and `task_hint`
151/// are forwarded to the distillation step.
152#[derive(Debug, Deserialize)]
153pub struct SelfJudgeOutcome {
154    /// Whether the agent successfully completed the task.
155    pub success: bool,
156    /// Key reasoning steps the agent took, as free-form text.
157    pub reasoning_chain: String,
158    /// One-sentence summary of the task.
159    pub task_hint: String,
160}
161
162/// SQLite-backed store for distilled reasoning strategies.
163///
164/// Attach to [`crate::semantic::SemanticMemory`] via `with_reasoning`.
165/// All write operations are best-effort: `SQLite` errors are propagated as
166/// [`MemoryError`], Qdrant failures are logged and silently ignored.
167pub struct ReasoningMemory {
168    pool: DbPool,
169    /// Optional vector store for embedding-similarity retrieval.
170    ///
171    /// `None` when Qdrant is unavailable; falls back to returning empty results.
172    vector_store: Option<std::sync::Arc<dyn VectorStore>>,
173}
174
175/// Qdrant collection name used for reasoning-strategy embeddings.
176pub const REASONING_COLLECTION: &str = "reasoning_strategies";
177
178impl ReasoningMemory {
179    /// Create a new `ReasoningMemory` backed by the given `SQLite` pool.
180    ///
181    /// Pass `vector_store = Some(arc)` to enable embedding-similarity retrieval via Qdrant.
182    /// When `None`, [`Self::retrieve_by_embedding`] always returns an empty vec.
183    ///
184    /// # Examples
185    ///
186    /// ```no_run
187    /// use zeph_memory::reasoning::ReasoningMemory;
188    ///
189    /// async fn demo(pool: zeph_db::DbPool) {
190    ///     let memory = ReasoningMemory::new(pool, None);
191    /// }
192    /// ```
193    #[must_use]
194    pub fn new(pool: DbPool, vector_store: Option<std::sync::Arc<dyn VectorStore>>) -> Self {
195        Self { pool, vector_store }
196    }
197
198    /// Insert a new strategy into `SQLite`.
199    ///
200    /// When a `vector_store` is configured, the strategy is also upserted into
201    /// the Qdrant `reasoning_strategies` collection using the provided `embedding`.
202    /// Qdrant failures are logged at `warn` level and do not fail the insert.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the `SQLite` insert fails.
207    #[tracing::instrument(name = "memory.reasoning.insert", skip(self, embedding), fields(id = %strategy.id))]
208    pub async fn insert(
209        &self,
210        strategy: &ReasoningStrategy,
211        embedding: Vec<f32>,
212    ) -> Result<(), MemoryError> {
213        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
214        let raw = format!(
215            "INSERT OR REPLACE INTO reasoning_strategies \
216             (id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at) \
217             VALUES (?, ?, ?, ?, {epoch_now}, {epoch_now}, 0, NULL)"
218        );
219        let sql = zeph_db::rewrite_placeholders(&raw);
220        zeph_db::query(&sql)
221            .bind(&strategy.id)
222            .bind(&strategy.summary)
223            .bind(strategy.outcome.as_str())
224            .bind(&strategy.task_hint)
225            .execute(&self.pool)
226            .await?;
227
228        // Qdrant upsert — best effort: SQLite row already written.
229        if let Some(ref vs) = self.vector_store {
230            let point = crate::vector_store::VectorPoint {
231                id: strategy.id.clone(),
232                vector: embedding,
233                payload: std::collections::HashMap::from([
234                    (
235                        "outcome".to_owned(),
236                        serde_json::Value::String(strategy.outcome.as_str().to_owned()),
237                    ),
238                    (
239                        "task_hint".to_owned(),
240                        serde_json::Value::String(strategy.task_hint.clone()),
241                    ),
242                ]),
243            };
244            if let Err(e) = vs.upsert(REASONING_COLLECTION, vec![point]).await {
245                tracing::warn!(error = %e, id = %strategy.id, "reasoning: Qdrant upsert failed — SQLite-only mode");
246            } else {
247                // Mark embedded_at on success.
248                let update_sql = zeph_db::rewrite_placeholders(&format!(
249                    "UPDATE reasoning_strategies SET embedded_at = {epoch_now} WHERE id = ?"
250                ));
251                if let Err(e) = zeph_db::query(&update_sql)
252                    .bind(&strategy.id)
253                    .execute(&self.pool)
254                    .await
255                {
256                    tracing::warn!(error = %e, "reasoning: failed to set embedded_at");
257                }
258            }
259        }
260
261        tracing::debug!(id = %strategy.id, outcome = strategy.outcome.as_str(), "reasoning: strategy inserted");
262        Ok(())
263    }
264
265    /// Retrieve up to `top_k` strategies by embedding similarity.
266    ///
267    /// This method is **pure** — it does not update `use_count` or `last_used_at`.
268    /// Call [`Self::mark_used`] with the ids of strategies actually injected into the
269    /// prompt (after budget truncation) to maintain accurate retrieval bookkeeping.
270    ///
271    /// Returns an empty vec when no vector store is configured.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the Qdrant search or `SQLite` fetch fails.
276    #[tracing::instrument(
277        name = "memory.reasoning.retrieve_by_embedding",
278        skip(self, embedding),
279        fields(top_k)
280    )]
281    pub async fn retrieve_by_embedding(
282        &self,
283        embedding: &[f32],
284        top_k: u64,
285    ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
286        let Some(ref vs) = self.vector_store else {
287            return Ok(Vec::new());
288        };
289
290        let scored = vs
291            .search(REASONING_COLLECTION, embedding.to_vec(), top_k, None)
292            .await?;
293
294        if scored.is_empty() {
295            return Ok(Vec::new());
296        }
297
298        let ids: Vec<String> = scored.into_iter().map(|p| p.id).collect();
299        self.fetch_by_ids(&ids).await
300    }
301
302    /// Increment `use_count` and update `last_used_at` for each id in the list.
303    ///
304    /// Safe to call with an empty slice — no SQL is issued.
305    /// The list is chunked into batches of [`MAX_IDS_PER_QUERY`] to respect `SQLite`'s
306    /// variable limit.
307    ///
308    /// # Errors
309    ///
310    /// Returns an error if the database update fails.
311    #[tracing::instrument(name = "memory.reasoning.mark_used", skip(self), fields(n = ids.len()))]
312    pub async fn mark_used(&self, ids: &[String]) -> Result<(), MemoryError> {
313        if ids.is_empty() {
314            return Ok(());
315        }
316
317        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
318        for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
319            let ph = placeholder_list(1, chunk.len());
320            // Note: placeholder_list already generates ?1,?2,... (SQLite) or $1,$2,... (postgres).
321            // Do NOT call rewrite_placeholders here — that would corrupt ?1 into $11.
322            let sql = format!(
323                "UPDATE reasoning_strategies \
324                 SET use_count = use_count + 1, last_used_at = {epoch_now} \
325                 WHERE id IN ({ph})"
326            );
327            let mut q = zeph_db::query(&sql);
328            for id in chunk {
329                q = q.bind(id.as_str());
330            }
331            q.execute(&self.pool).await?;
332        }
333
334        Ok(())
335    }
336
337    /// Evict strategies when the table exceeds `store_limit`.
338    ///
339    /// **Normal path**: delete rows with `use_count <= HOT_STRATEGY_USE_COUNT`, oldest
340    /// first, until the table returns to `store_limit`.
341    ///
342    /// **Saturation path**: when the normal path deletes nothing AND the table exceeds
343    /// `2 × store_limit`, bypass hot-row protection and delete oldest rows regardless of
344    /// `use_count`. Emits a `warn!` with the eviction count so operators can tune
345    /// `store_limit` upward or lower the hot threshold.
346    ///
347    /// Returns the number of rows deleted.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if any database operation fails.
352    #[tracing::instrument(name = "memory.reasoning.evict_lru", skip(self), fields(store_limit))]
353    pub async fn evict_lru(&self, store_limit: usize) -> Result<usize, MemoryError> {
354        let count = self.count().await?;
355        if count <= store_limit {
356            return Ok(0);
357        }
358
359        let over_by = count - store_limit;
360        let deleted_cold = self.delete_oldest_cold(over_by).await?;
361        if deleted_cold > 0 {
362            // Also delete from Qdrant best-effort (ids not tracked here — full resync on recovery).
363            tracing::debug!(
364                deleted = deleted_cold,
365                count,
366                "reasoning: evicted cold strategies"
367            );
368            return Ok(deleted_cold);
369        }
370
371        // All rows over limit are hot. Check hard ceiling.
372        let hard_ceiling = store_limit.saturating_mul(2);
373        if count <= hard_ceiling {
374            tracing::debug!(
375                count,
376                store_limit,
377                "reasoning: hot saturation — growth allowed under 2x ceiling"
378            );
379            return Ok(0);
380        }
381
382        // Hard ceiling breached: force-evict oldest rows unconditionally.
383        let forced = count - store_limit;
384        let deleted_forced = self.delete_oldest_unconditional(forced).await?;
385        tracing::warn!(
386            deleted = deleted_forced,
387            count,
388            hard_ceiling,
389            "reasoning: hard-ceiling eviction — evicted hot strategies; consider raising store_limit"
390        );
391
392        Ok(deleted_forced)
393    }
394
395    /// Return the total number of rows in `reasoning_strategies`.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the database query fails.
400    pub async fn count(&self) -> Result<usize, MemoryError> {
401        let row: (i64,) = zeph_db::query_as("SELECT COUNT(*) FROM reasoning_strategies")
402            .fetch_one(&self.pool)
403            .await?;
404        Ok(usize::try_from(row.0.max(0)).unwrap_or(0))
405    }
406
407    // ── private helpers ───────────────────────────────────────────────────────
408
409    /// Fetch strategy rows by their ids in a single `WHERE id IN (...)` query.
410    pub(crate) async fn fetch_by_ids(
411        &self,
412        ids: &[String],
413    ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
414        if ids.is_empty() {
415            return Ok(Vec::new());
416        }
417
418        let mut strategies = Vec::with_capacity(ids.len());
419        for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
420            let ph = placeholder_list(1, chunk.len());
421            // Note: placeholder_list generates DB-specific ?N/$N syntax — do NOT rewite.
422            let sql = format!(
423                "SELECT id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at \
424                 FROM reasoning_strategies WHERE id IN ({ph})"
425            );
426            let mut q = zeph_db::query_as::<
427                _,
428                (String, String, String, String, i64, i64, i64, Option<i64>),
429            >(&sql);
430            for id in chunk {
431                q = q.bind(id.as_str());
432            }
433            let rows = q.fetch_all(&self.pool).await?;
434            for (
435                id,
436                summary,
437                outcome_str,
438                task_hint,
439                created_at,
440                last_used_at,
441                use_count,
442                embedded_at,
443            ) in rows
444            {
445                let outcome = Outcome::from_str(&outcome_str).unwrap_or(Outcome::Failure);
446                strategies.push(ReasoningStrategy {
447                    id,
448                    summary,
449                    outcome,
450                    task_hint,
451                    created_at,
452                    last_used_at,
453                    use_count,
454                    embedded_at,
455                });
456            }
457        }
458
459        Ok(strategies)
460    }
461
462    /// Delete up to `n` cold rows (`use_count <= HOT_STRATEGY_USE_COUNT`), oldest first.
463    ///
464    /// Returns the number of deleted rows.
465    async fn delete_oldest_cold(&self, n: usize) -> Result<usize, MemoryError> {
466        let limit = i64::try_from(n).unwrap_or(i64::MAX);
467        // Use plain `?` + rewrite_placeholders so postgres gets `$1`.
468        let raw = format!(
469            "DELETE FROM reasoning_strategies \
470             WHERE id IN ( \
471               SELECT id FROM reasoning_strategies \
472               WHERE use_count <= {HOT_STRATEGY_USE_COUNT} \
473               ORDER BY last_used_at ASC LIMIT ? \
474             )"
475        );
476        let sql = zeph_db::rewrite_placeholders(&raw);
477        let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
478        Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
479    }
480
481    /// Delete up to `n` rows unconditionally (oldest by `last_used_at`).
482    ///
483    /// Used only for the hard-ceiling saturation path.
484    async fn delete_oldest_unconditional(&self, n: usize) -> Result<usize, MemoryError> {
485        let limit = i64::try_from(n).unwrap_or(i64::MAX);
486        let raw = "DELETE FROM reasoning_strategies \
487                   WHERE id IN ( \
488                     SELECT id FROM reasoning_strategies \
489                     ORDER BY last_used_at ASC LIMIT ? \
490                   )";
491        let sql = zeph_db::rewrite_placeholders(raw);
492        let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
493        Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
494    }
495}
496
497// ── Free functions ────────────────────────────────────────────────────────────
498
499/// Run the self-judge step against a turn's message tail.
500///
501/// Sends the last `messages` slice to the LLM with the self-judge system prompt and
502/// attempts to parse the JSON response into a [`SelfJudgeOutcome`].
503///
504/// Returns `None` on parse failure, timeout, or LLM error — never propagates errors.
505/// Callers should log the `None` case at most at `debug` level.
506///
507/// # Examples
508///
509/// ```no_run
510/// use std::time::Duration;
511/// use zeph_llm::any::AnyProvider;
512/// use zeph_memory::reasoning::run_self_judge;
513///
514/// async fn demo(provider: AnyProvider, messages: &[zeph_llm::provider::Message]) {
515///     let outcome = run_self_judge(&provider, messages, Duration::from_secs(10)).await;
516///     if let Some(o) = outcome {
517///         println!("success={}, hint={}", o.success, o.task_hint);
518///     }
519/// }
520/// ```
521#[tracing::instrument(name = "memory.reasoning.self_judge", skip(provider, messages), fields(n = messages.len()))]
522pub async fn run_self_judge(
523    provider: &AnyProvider,
524    messages: &[Message],
525    extraction_timeout: Duration,
526) -> Option<SelfJudgeOutcome> {
527    if messages.is_empty() {
528        return None;
529    }
530
531    let user_prompt = build_transcript_prompt(messages);
532
533    let llm_messages = [
534        Message::from_legacy(Role::System, SELF_JUDGE_SYSTEM),
535        Message::from_legacy(Role::User, user_prompt),
536    ];
537
538    let response = match timeout(extraction_timeout, provider.chat(&llm_messages)).await {
539        Ok(Ok(text)) => text,
540        Ok(Err(e)) => {
541            tracing::warn!(error = %e, "reasoning: self-judge LLM call failed");
542            return None;
543        }
544        Err(_) => {
545            tracing::warn!("reasoning: self-judge timed out");
546            return None;
547        }
548    };
549
550    parse_self_judge_response(&response)
551}
552
553/// Run the distillation step.
554///
555/// Sends the reasoning chain and outcome label to the LLM and trims the response to
556/// at most 3 sentences and 512 characters.
557///
558/// Returns `None` on LLM error, timeout, or empty response.
559///
560/// # Examples
561///
562/// ```no_run
563/// use std::time::Duration;
564/// use zeph_llm::any::AnyProvider;
565/// use zeph_memory::reasoning::{Outcome, distill_strategy};
566///
567/// async fn demo(provider: AnyProvider) {
568///     let summary = distill_strategy(&provider, Outcome::Success, "tried X, worked", Duration::from_secs(10)).await;
569///     println!("{:?}", summary);
570/// }
571/// ```
572#[tracing::instrument(name = "memory.reasoning.distill", skip(provider, reasoning_chain))]
573pub async fn distill_strategy(
574    provider: &AnyProvider,
575    outcome: Outcome,
576    reasoning_chain: &str,
577    distill_timeout: Duration,
578) -> Option<String> {
579    if reasoning_chain.is_empty() {
580        return None;
581    }
582
583    let user_prompt = format!(
584        "Outcome: {}\n\nReasoning chain:\n{reasoning_chain}",
585        outcome.as_str()
586    );
587
588    let llm_messages = [
589        Message::from_legacy(Role::System, DISTILL_SYSTEM),
590        Message::from_legacy(Role::User, user_prompt),
591    ];
592
593    let response = match timeout(distill_timeout, provider.chat(&llm_messages)).await {
594        Ok(Ok(text)) => text,
595        Ok(Err(e)) => {
596            tracing::warn!(error = %e, "reasoning: distillation LLM call failed");
597            return None;
598        }
599        Err(_) => {
600            tracing::warn!("reasoning: distillation timed out");
601            return None;
602        }
603    };
604
605    let trimmed = trim_to_three_sentences(&response);
606    if trimmed.is_empty() {
607        None
608    } else {
609        Some(trimmed)
610    }
611}
612
613/// Configuration for the [`process_turn`] extraction pipeline.
614///
615/// Groups timeout and limit parameters that rarely change between turns.
616#[derive(Debug, Clone, Copy)]
617pub struct ProcessTurnConfig {
618    /// Maximum rows to retain in the `reasoning_strategies` table.
619    pub store_limit: usize,
620    /// Timeout for the self-judge LLM call.
621    pub extraction_timeout: Duration,
622    /// Timeout for the distillation LLM call.
623    pub distill_timeout: Duration,
624    /// Maximum number of recent messages sliced from the turn history before passing
625    /// to the self-judge evaluator. Narrowing the window prevents digest/recap messages
626    /// from prior sessions from confusing the classifier. Default: `2`.
627    pub self_judge_window: usize,
628    /// Minimum character count in the last assistant message to trigger self-judge.
629    /// Short or trivial responses (greetings, one-word answers) are skipped. Default: `50`.
630    pub min_assistant_chars: usize,
631}
632
633/// Run the full extraction pipeline for a single turn.
634///
635/// Calls [`run_self_judge`], then [`distill_strategy`], then inserts the result.
636/// `evict_lru` is called when the table exceeds `store_limit`. All errors are
637/// logged at `warn` level and the function returns `Ok(())` so callers never
638/// propagate pipeline failures.
639///
640/// # Errors
641///
642/// Returns an error if the embedding call fails, but not if self-judge or distillation fails.
643#[tracing::instrument(name = "memory.reasoning.process_turn", skip_all)]
644pub async fn process_turn(
645    memory: &ReasoningMemory,
646    extract_provider: &AnyProvider,
647    distill_provider: &AnyProvider,
648    embed_provider: &AnyProvider,
649    messages: &[Message],
650    cfg: ProcessTurnConfig,
651) -> Result<(), MemoryError> {
652    let ProcessTurnConfig {
653        store_limit,
654        extraction_timeout,
655        distill_timeout,
656        self_judge_window,
657        min_assistant_chars,
658    } = cfg;
659
660    // Narrow the message window to reduce noise from session digests and welcome-back
661    // messages that span prior sessions, which can confuse the self-judge classifier.
662    let judge_messages = if messages.len() > self_judge_window {
663        &messages[messages.len() - self_judge_window..]
664    } else {
665        messages
666    };
667
668    // Skip self-judge when the last assistant response is too short to be meaningful.
669    let last_assistant_chars = judge_messages
670        .iter()
671        .rev()
672        .find(|m| m.role == Role::Assistant)
673        .map_or(0, |m| m.content.len());
674    if last_assistant_chars < min_assistant_chars {
675        return Ok(());
676    }
677
678    let Some(outcome) = run_self_judge(extract_provider, judge_messages, extraction_timeout).await
679    else {
680        return Ok(());
681    };
682
683    let outcome_enum = if outcome.success {
684        Outcome::Success
685    } else {
686        Outcome::Failure
687    };
688
689    let Some(summary) = distill_strategy(
690        distill_provider,
691        outcome_enum,
692        &outcome.reasoning_chain,
693        distill_timeout,
694    )
695    .await
696    else {
697        return Ok(());
698    };
699
700    // Embed task_hint + summary for Qdrant retrieval (S2 from architect plan).
701    let embed_input = format!("{}\n{}", outcome.task_hint, summary);
702    let embedding = match embed_provider.embed(&embed_input).await {
703        Ok(v) => v,
704        Err(e) => {
705            tracing::warn!(error = %e, "reasoning: embedding failed — strategy not stored");
706            return Ok(());
707        }
708    };
709
710    let id = uuid::Uuid::new_v4().to_string();
711    let strategy = ReasoningStrategy {
712        id,
713        summary,
714        outcome: outcome_enum,
715        task_hint: outcome.task_hint,
716        created_at: 0, // filled by SQL EPOCH_NOW
717        last_used_at: 0,
718        use_count: 0,
719        embedded_at: None,
720    };
721
722    // P2-2: check count before insert to skip the evict_lru SELECT+DELETE when not needed.
723    // If count is already at or above store_limit, evict after insert. Approximate: two
724    // concurrent inserts can both read the same count and both decide to evict — the
725    // evict_lru implementation is idempotent so over-eviction by ≤1 row is acceptable.
726    let count_before = memory.count().await.unwrap_or(0);
727
728    if let Err(e) = memory.insert(&strategy, embedding).await {
729        tracing::warn!(error = %e, "reasoning: insert failed");
730        return Ok(());
731    }
732
733    if count_before >= store_limit
734        && let Err(e) = memory.evict_lru(store_limit).await
735    {
736        tracing::warn!(error = %e, "reasoning: evict_lru failed");
737    }
738
739    Ok(())
740}
741
742// ── private helpers ───────────────────────────────────────────────────────────
743
744/// Maximum characters taken from a single message's content in the transcript prompt.
745///
746/// Prevents unbounded prompt growth when long tool outputs or code blocks are present
747/// in the turn history (S-Med2 fix).
748const MAX_TRANSCRIPT_MESSAGE_CHARS: usize = 2000;
749
750/// Build a turn transcript prompt from the message slice.
751///
752/// Each message's content is truncated to [`MAX_TRANSCRIPT_MESSAGE_CHARS`] to bound
753/// the prompt length regardless of tool-output size. Mirrors the
754/// `build_extraction_prompt` format in `trajectory.rs` for consistency.
755fn build_transcript_prompt(messages: &[Message]) -> String {
756    let mut prompt = String::from("Agent turn messages:\n");
757    for (i, msg) in messages.iter().enumerate() {
758        use std::fmt::Write as _;
759        let role = format!("{:?}", msg.role);
760        // Truncate at a char boundary to avoid invalid UTF-8 slices.
761        let content: std::borrow::Cow<str> =
762            if msg.content.chars().count() > MAX_TRANSCRIPT_MESSAGE_CHARS {
763                msg.content
764                    .char_indices()
765                    .nth(MAX_TRANSCRIPT_MESSAGE_CHARS)
766                    .map_or(msg.content.as_str().into(), |(byte_idx, _)| {
767                        msg.content[..byte_idx].into()
768                    })
769            } else {
770                msg.content.as_str().into()
771            };
772        let _ = writeln!(prompt, "[{}] {}: {}", i + 1, role, content);
773    }
774    prompt.push_str("\nEvaluate this turn and return JSON.");
775    prompt
776}
777
778/// Parse the LLM response from the self-judge step into a [`SelfJudgeOutcome`].
779///
780/// Strips markdown code fences, then tries direct parse; on failure, locates the
781/// outermost `{…}` brackets and tries again. Returns `None` on persistent parse failure.
782fn parse_self_judge_response(response: &str) -> Option<SelfJudgeOutcome> {
783    // Strip markdown fences (```json … ```)
784    let stripped = response
785        .trim()
786        .trim_start_matches("```json")
787        .trim_start_matches("```")
788        .trim_end_matches("```")
789        .trim();
790
791    if let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(stripped) {
792        return Some(v);
793    }
794
795    // Try to extract the first `{…}` span.
796    if let (Some(start), Some(end)) = (stripped.find('{'), stripped.rfind('}'))
797        && end > start
798        && let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(&stripped[start..=end])
799    {
800        return Some(v);
801    }
802
803    tracing::warn!(
804        "reasoning: failed to parse self-judge response (len={}): {:.200}",
805        response.len(),
806        response
807    );
808    None
809}
810
811/// Trim text to at most 3 sentences and 512 characters.
812///
813/// Sentence boundaries are detected by `.`, `!`, `?` followed by whitespace or end-of-string.
814/// The hard 512-char cap truncates at the nearest char boundary below the limit.
815fn trim_to_three_sentences(text: &str) -> String {
816    const MAX_CHARS: usize = 512;
817    const MAX_SENTENCES: usize = 3;
818
819    let text = text.trim();
820    let mut sentence_ends: Vec<usize> = Vec::new();
821    let chars: Vec<char> = text.chars().collect();
822    let len = chars.len();
823
824    for (i, &ch) in chars.iter().enumerate() {
825        if matches!(ch, '.' | '!' | '?') {
826            let next_is_boundary = i + 1 >= len || chars[i + 1].is_whitespace();
827            if next_is_boundary {
828                sentence_ends.push(i + 1); // exclusive byte position (chars)
829                if sentence_ends.len() >= MAX_SENTENCES {
830                    break;
831                }
832            }
833        }
834    }
835
836    let char_limit = if let Some(&end) = sentence_ends.last() {
837        end.min(MAX_CHARS)
838    } else {
839        text.chars().count().min(MAX_CHARS)
840    };
841
842    let result: String = text.chars().take(char_limit).collect();
843    // Hard cap on byte length (chars already limited, but enforce once more).
844    match result.char_indices().nth(MAX_CHARS) {
845        Some((byte_idx, _)) => result[..byte_idx].to_owned(),
846        None => result,
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853
854    // ── Outcome ────────────────────────────────────────────────────────────────
855
856    #[test]
857    fn outcome_as_str_round_trip() {
858        assert_eq!(Outcome::Success.as_str(), "success");
859        assert_eq!(Outcome::Failure.as_str(), "failure");
860    }
861
862    #[test]
863    fn outcome_from_str_success() {
864        assert_eq!(Outcome::from_str("success").unwrap(), Outcome::Success);
865    }
866
867    #[test]
868    fn outcome_from_str_failure() {
869        assert_eq!(Outcome::from_str("failure").unwrap(), Outcome::Failure);
870    }
871
872    #[test]
873    fn outcome_from_str_unknown_defaults_to_failure() {
874        // Unknown values silently map to Failure (forward-compatible).
875        assert_eq!(Outcome::from_str("partial").unwrap(), Outcome::Failure);
876    }
877
878    // ── parse_self_judge_response ─────────────────────────────────────────────
879
880    #[test]
881    fn parse_direct_json() {
882        let json = r#"{"success":true,"reasoning_chain":"tried X","task_hint":"do Y"}"#;
883        let outcome = parse_self_judge_response(json).unwrap();
884        assert!(outcome.success);
885        assert_eq!(outcome.reasoning_chain, "tried X");
886        assert_eq!(outcome.task_hint, "do Y");
887    }
888
889    #[test]
890    fn parse_json_with_markdown_fences() {
891        let response =
892            "```json\n{\"success\":false,\"reasoning_chain\":\"r\",\"task_hint\":\"t\"}\n```";
893        let outcome = parse_self_judge_response(response).unwrap();
894        assert!(!outcome.success);
895    }
896
897    #[test]
898    fn parse_json_embedded_in_prose() {
899        let response = r#"Here is the evaluation: {"success":true,"reasoning_chain":"chain","task_hint":"hint"} — done."#;
900        let outcome = parse_self_judge_response(response).unwrap();
901        assert!(outcome.success);
902    }
903
904    #[test]
905    fn parse_invalid_returns_none() {
906        let outcome = parse_self_judge_response("not json at all");
907        assert!(outcome.is_none());
908    }
909
910    // ── trim_to_three_sentences ───────────────────────────────────────────────
911
912    #[test]
913    fn trim_three_sentences_short_text() {
914        let text = "One. Two. Three.";
915        assert_eq!(trim_to_three_sentences(text), "One. Two. Three.");
916    }
917
918    #[test]
919    fn trim_three_sentences_truncates_at_third() {
920        let text = "One. Two. Three. Four. Five.";
921        let result = trim_to_three_sentences(text);
922        assert!(result.ends_with("Three."), "got: {result}");
923        assert!(!result.contains("Four"));
924    }
925
926    #[test]
927    fn trim_three_sentences_hard_cap() {
928        // 600 chars, no sentence boundaries → should be capped at 512 chars
929        let long: String = "x".repeat(600);
930        let result = trim_to_three_sentences(&long);
931        assert!(result.chars().count() <= 512);
932    }
933
934    #[test]
935    fn trim_three_sentences_empty() {
936        assert_eq!(trim_to_three_sentences("   "), "");
937    }
938
939    // ── ReasoningMemory (in-memory SQLite) ────────────────────────────────────
940
941    async fn make_test_pool() -> DbPool {
942        let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
943        sqlx::query(
944            "CREATE TABLE reasoning_strategies (
945                id           TEXT    PRIMARY KEY NOT NULL,
946                summary      TEXT    NOT NULL,
947                outcome      TEXT    NOT NULL,
948                task_hint    TEXT    NOT NULL,
949                created_at   INTEGER NOT NULL DEFAULT (unixepoch('now')),
950                last_used_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
951                use_count    INTEGER NOT NULL DEFAULT 0,
952                embedded_at  INTEGER
953            )",
954        )
955        .execute(&pool)
956        .await
957        .unwrap();
958        pool
959    }
960
961    fn make_strategy(id: &str) -> ReasoningStrategy {
962        ReasoningStrategy {
963            id: id.to_owned(),
964            summary: format!("Summary for {id}"),
965            outcome: Outcome::Success,
966            task_hint: format!("Task hint for {id}"),
967            created_at: 0,
968            last_used_at: 0,
969            use_count: 0,
970            embedded_at: None,
971        }
972    }
973
974    #[tokio::test]
975    async fn insert_and_fetch_by_ids() {
976        let pool = make_test_pool().await;
977        let mem = ReasoningMemory::new(pool, None);
978
979        let s = make_strategy("abc-123");
980        mem.insert(&s, vec![]).await.unwrap();
981
982        let rows = mem.fetch_by_ids(&["abc-123".to_owned()]).await.unwrap();
983        assert_eq!(rows.len(), 1);
984        assert_eq!(rows[0].id, "abc-123");
985        assert_eq!(rows[0].outcome, Outcome::Success);
986    }
987
988    #[tokio::test]
989    async fn mark_used_increments_count() {
990        let pool = make_test_pool().await;
991        let mem = ReasoningMemory::new(pool, None);
992
993        let s = make_strategy("mark-1");
994        mem.insert(&s, vec![]).await.unwrap();
995        mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
996        mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
997
998        let rows = mem.fetch_by_ids(&["mark-1".to_owned()]).await.unwrap();
999        assert_eq!(rows[0].use_count, 2);
1000    }
1001
1002    #[tokio::test]
1003    async fn mark_used_empty_is_noop() {
1004        let pool = make_test_pool().await;
1005        let mem = ReasoningMemory::new(pool, None);
1006        // Should not panic or error on empty slice.
1007        mem.mark_used(&[]).await.unwrap();
1008    }
1009
1010    #[tokio::test]
1011    async fn count_returns_correct_total() {
1012        let pool = make_test_pool().await;
1013        let mem = ReasoningMemory::new(pool, None);
1014
1015        for i in 0..5 {
1016            mem.insert(&make_strategy(&format!("s{i}")), vec![])
1017                .await
1018                .unwrap();
1019        }
1020
1021        assert_eq!(mem.count().await.unwrap(), 5);
1022    }
1023
1024    #[tokio::test]
1025    async fn evict_lru_cold_rows() {
1026        let pool = make_test_pool().await;
1027        let mem = ReasoningMemory::new(pool, None);
1028
1029        // Insert 5 cold rows (use_count = 0 by default).
1030        for i in 0..5 {
1031            mem.insert(&make_strategy(&format!("cold-{i}")), vec![])
1032                .await
1033                .unwrap();
1034        }
1035
1036        // Store limit is 3 → should delete 2 oldest.
1037        let deleted = mem.evict_lru(3).await.unwrap();
1038        assert_eq!(deleted, 2);
1039        assert_eq!(mem.count().await.unwrap(), 3);
1040    }
1041
1042    #[tokio::test]
1043    async fn evict_lru_respects_hot_rows_under_ceiling() {
1044        let pool = make_test_pool().await;
1045        let mem = ReasoningMemory::new(pool.clone(), None);
1046
1047        // Insert 5 hot rows by manually setting use_count > HOT_STRATEGY_USE_COUNT.
1048        for i in 0..5 {
1049            let id = format!("hot-{i}");
1050            mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1051            // Mark used 11 times to make them hot.
1052            let ids: Vec<String> = (0..11).map(|_| id.clone()).collect();
1053            for chunk_ids in ids.chunks(1) {
1054                mem.mark_used(chunk_ids).await.unwrap();
1055            }
1056        }
1057
1058        // store_limit=3, count=5, all hot, 5 < 2*3=6 → under ceiling → no deletion.
1059        let deleted = mem.evict_lru(3).await.unwrap();
1060        assert_eq!(deleted, 0);
1061        assert_eq!(mem.count().await.unwrap(), 5);
1062    }
1063
1064    #[tokio::test]
1065    async fn evict_lru_hard_ceiling_forces_deletion() {
1066        let pool = make_test_pool().await;
1067        let mem = ReasoningMemory::new(pool.clone(), None);
1068
1069        // Insert 7 hot rows. store_limit=3, ceiling=6. 7 > 6 → forced eviction.
1070        for i in 0..7 {
1071            let id = format!("hot2-{i}");
1072            mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1073            // Make hot.
1074            for _ in 0..=HOT_STRATEGY_USE_COUNT {
1075                mem.mark_used(std::slice::from_ref(&id)).await.unwrap();
1076            }
1077        }
1078
1079        let deleted = mem.evict_lru(3).await.unwrap();
1080        assert!(deleted > 0, "expected forced deletion");
1081        let remaining = mem.count().await.unwrap();
1082        assert_eq!(remaining, 3, "should be trimmed to store_limit");
1083    }
1084
1085    #[tokio::test]
1086    async fn evict_lru_no_op_when_under_limit() {
1087        let pool = make_test_pool().await;
1088        let mem = ReasoningMemory::new(pool, None);
1089
1090        for i in 0..3 {
1091            mem.insert(&make_strategy(&format!("s{i}")), vec![])
1092                .await
1093                .unwrap();
1094        }
1095
1096        // store_limit=10 → count(3) ≤ 10 → no deletion.
1097        let deleted = mem.evict_lru(10).await.unwrap();
1098        assert_eq!(deleted, 0);
1099    }
1100
1101    // ── mark_used chunked path ────────────────────────────────────────────────
1102
1103    #[tokio::test]
1104    async fn mark_used_chunked_over_490_ids() {
1105        let pool = make_test_pool().await;
1106        let mem = ReasoningMemory::new(pool, None);
1107
1108        // Insert 500 strategies — exceeds MAX_IDS_PER_QUERY (490) forcing two SQL batches.
1109        for i in 0..500usize {
1110            mem.insert(&make_strategy(&format!("chunked-{i}")), vec![])
1111                .await
1112                .unwrap();
1113        }
1114
1115        let ids: Vec<String> = (0..500usize).map(|i| format!("chunked-{i}")).collect();
1116        mem.mark_used(&ids).await.unwrap();
1117
1118        // Spot-check: first and 491st should both have use_count == 1.
1119        let first = mem.fetch_by_ids(&[ids[0].clone()]).await.unwrap();
1120        let over_chunk = mem.fetch_by_ids(&[ids[490].clone()]).await.unwrap();
1121        assert_eq!(first[0].use_count, 1, "first id should have use_count = 1");
1122        assert_eq!(
1123            over_chunk[0].use_count, 1,
1124            "id past the chunk boundary should have use_count = 1"
1125        );
1126    }
1127
1128    // ── run_self_judge malformed response ─────────────────────────────────────
1129
1130    #[tokio::test]
1131    async fn run_self_judge_malformed_json_returns_none() {
1132        use zeph_llm::any::AnyProvider;
1133        use zeph_llm::mock::MockProvider;
1134
1135        // with_responses populates the one-shot queue; chat() returns this prose string.
1136        let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1137            "This is not JSON at all.".to_string(),
1138        ]));
1139        let msgs = vec![Message::from_legacy(Role::User, "hello")];
1140        let result = run_self_judge(&provider, &msgs, std::time::Duration::from_secs(5)).await;
1141        assert!(result.is_none(), "malformed LLM response must return None");
1142    }
1143
1144    // ── distill_strategy truncation ───────────────────────────────────────────
1145
1146    #[tokio::test]
1147    async fn distill_strategy_truncates_to_three_sentences() {
1148        use zeph_llm::any::AnyProvider;
1149        use zeph_llm::mock::MockProvider;
1150
1151        let long_response = "One. Two. Three. Four. Five.";
1152        let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1153            long_response.to_string(),
1154        ]));
1155        let result = distill_strategy(
1156            &provider,
1157            Outcome::Success,
1158            "chain here",
1159            std::time::Duration::from_secs(5),
1160        )
1161        .await
1162        .unwrap();
1163        assert!(result.ends_with("Three."), "got: {result}");
1164        assert!(
1165            !result.contains("Four"),
1166            "should not contain 4th sentence: {result}"
1167        );
1168    }
1169
1170    // ── process_turn smoke test ───────────────────────────────────────────────
1171
1172    #[tokio::test]
1173    async fn process_turn_with_empty_messages_is_noop() {
1174        use zeph_llm::any::AnyProvider;
1175        use zeph_llm::mock::MockProvider;
1176
1177        let pool = make_test_pool().await;
1178        let mem = ReasoningMemory::new(pool, None);
1179        // MockProvider returns "{}" which parse_self_judge_response will return None for
1180        // (missing required fields) → Ok(()) with zero inserts.
1181        let provider = AnyProvider::Mock(MockProvider::default());
1182        let cfg = ProcessTurnConfig {
1183            store_limit: 100,
1184            extraction_timeout: std::time::Duration::from_secs(1),
1185            distill_timeout: std::time::Duration::from_secs(1),
1186            self_judge_window: 2,
1187            min_assistant_chars: 0,
1188        };
1189        let result = process_turn(&mem, &provider, &provider, &provider, &[], cfg).await;
1190        assert!(
1191            result.is_ok(),
1192            "process_turn with empty messages must succeed"
1193        );
1194        assert_eq!(
1195            mem.count().await.unwrap(),
1196            0,
1197            "no strategies should be stored"
1198        );
1199    }
1200}