Skip to main content

zeph_memory/
reasoning.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! `ReasoningBank`: distilled reasoning strategy memory (#3342).
5//!
6//! After each completed agent turn a three-stage async pipeline runs off the hot path:
7//!
8//! 1. **Self-judge** ([`run_self_judge`]) — a fast LLM evaluates success/failure and
9//!    extracts the key reasoning steps.
10//! 2. **Distillation** ([`distill_strategy`]) — a strategy summary (≤ 3 sentences) is
11//!    generated from the reasoning chain, capturing the transferable principle.
12//! 3. **Storage** ([`ReasoningMemory::insert`]) — the summary is written to `SQLite`
13//!    and, when Qdrant is available, embedded and indexed for vector retrieval.
14//!
15//! At context-build time [`ReasoningMemory::retrieve_by_embedding`] fetches top-k
16//! strategies by embedding similarity. The caller (in `zeph-context`) calls
17//! [`ReasoningMemory::mark_used`] only for strategies actually injected into the prompt,
18//! after budget truncation (C4 split from architect plan).
19//!
20//! # LRU eviction
21//!
22//! [`ReasoningMemory::evict_lru`] protects rows with `use_count > HOT_STRATEGY_USE_COUNT`
23//! (default 10) from normal eviction. When all rows are hot and the table exceeds
24//! `2 × store_limit`, a forced eviction pass deletes the oldest rows unconditionally
25//! and emits a `warn!` so operators can tune `store_limit` upward.
26//!
27//! # LRU eviction race note
28//!
29//! Two concurrent turns may race on the count check in `evict_lru`. Either both evict
30//! (over-eviction by at most `top_k` rows) or neither. This is acceptable for MVP —
31//! the table remains bounded.
32
33use std::str::FromStr;
34use std::time::Duration;
35
36use serde::Deserialize;
37use tokio::time::timeout;
38use zeph_db::{ActiveDialect, DbPool, placeholder_list};
39use zeph_llm::any::AnyProvider;
40use zeph_llm::provider::{LlmProvider as _, Message, Role};
41
42use crate::error::MemoryError;
43use crate::vector_store::VectorStore;
44
45/// Minimum retrieval count to protect a strategy from normal LRU eviction.
46///
47/// Strategies with `use_count > HOT_STRATEGY_USE_COUNT` are skipped during normal
48/// cold-eviction and only removed when the table exceeds `2 × store_limit`.
49const HOT_STRATEGY_USE_COUNT: i64 = 10;
50
51/// Maximum ids per `SQLite` `WHERE id IN (...)` bind list (`SQLite` variable limit is 999).
52const MAX_IDS_PER_QUERY: usize = 490;
53
54/// System prompt for the self-judge LLM step.
55///
56/// Instructs the LLM to evaluate success/failure and extract the reasoning chain
57/// as structured JSON matching [`SelfJudgeOutcome`].
58const SELF_JUDGE_SYSTEM: &str = "\
59You are a task outcome evaluator. Given an agent turn transcript, analyze the conversation and determine:
601. Did the agent successfully complete the user's request? (true/false)
612. Extract the key reasoning steps the agent took (reasoning chain).
623. Summarize the task in one sentence (task hint).
63
64Respond ONLY with valid JSON, no markdown fences, no prose:
65{\"success\": bool, \"reasoning_chain\": \"string\", \"task_hint\": \"string\"}";
66
67/// System prompt for the distillation LLM step.
68///
69/// Instructs the LLM to compress a reasoning chain into a short, generalizable strategy.
70const DISTILL_SYSTEM: &str = "\
71You are a strategy distiller. Given a reasoning chain from an agent turn, distill it into \
72a short generalizable strategy (at most 3 sentences) that could help an agent facing a similar \
73task. Focus on the transferable principle, not the specific instance. \
74Respond with the strategy text only — no headers, no lists, no markdown.";
75
76/// Outcome of a reasoning strategy: whether the agent succeeded or failed.
77///
78/// Stored as a `TEXT NOT NULL` column (`"success"` or `"failure"`).
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80#[non_exhaustive]
81pub enum Outcome {
82    /// The agent successfully completed the task.
83    Success,
84    /// The agent failed to complete the task.
85    Failure,
86}
87
88impl Outcome {
89    /// Returns the canonical string representation stored in the database.
90    #[must_use]
91    pub fn as_str(self) -> &'static str {
92        match self {
93            Outcome::Success => "success",
94            Outcome::Failure => "failure",
95        }
96    }
97}
98
99/// Error returned when parsing an [`Outcome`] from a string fails.
100#[derive(Debug, thiserror::Error)]
101#[error("unknown outcome: {0}")]
102pub struct OutcomeParseError(String);
103
104impl FromStr for Outcome {
105    type Err = OutcomeParseError;
106
107    fn from_str(s: &str) -> Result<Self, Self::Err> {
108        match s {
109            "success" => Ok(Outcome::Success),
110            "failure" => Ok(Outcome::Failure),
111            other => {
112                tracing::warn!(
113                    value = other,
114                    "reasoning: unknown outcome, defaulting to Failure"
115                );
116                Ok(Outcome::Failure)
117            }
118        }
119    }
120}
121
122/// A distilled reasoning strategy row from the `reasoning_strategies` table.
123///
124/// Constructed after a successful self-judge + distillation pipeline run.
125/// Persisted in `SQLite` and (when Qdrant is available) indexed as a vector embedding.
126#[derive(Debug, Clone)]
127pub struct ReasoningStrategy {
128    /// UUID v4 primary key.
129    pub id: String,
130    /// Distilled strategy summary (≤ 3 sentences, ≤ 512 chars).
131    pub summary: String,
132    /// Whether the agent succeeded or failed on the source turn.
133    pub outcome: Outcome,
134    /// One-sentence description of the task that produced this strategy.
135    pub task_hint: String,
136    /// Unix timestamp (seconds) when this strategy was created.
137    pub created_at: i64,
138    /// Unix timestamp (seconds) of the last retrieval.
139    pub last_used_at: i64,
140    /// Number of times this strategy has been injected into context.
141    pub use_count: i64,
142    /// Unix timestamp (seconds) when the Qdrant embedding was created.
143    ///
144    /// `None` means this row has not been embedded yet (Qdrant was unavailable at insert time).
145    pub embedded_at: Option<i64>,
146}
147
148/// Parsed response from the self-judge LLM call.
149///
150/// Deserialized from the LLM JSON response in [`run_self_judge`].
151/// The `success` field drives [`Outcome`] selection; `reasoning_chain` and `task_hint`
152/// are forwarded to the distillation step.
153#[derive(Debug, Deserialize)]
154pub struct SelfJudgeOutcome {
155    /// Whether the agent successfully completed the task.
156    pub success: bool,
157    /// Key reasoning steps the agent took, as free-form text.
158    pub reasoning_chain: String,
159    /// One-sentence summary of the task.
160    pub task_hint: String,
161}
162
163/// SQLite-backed store for distilled reasoning strategies.
164///
165/// Attach to [`crate::semantic::SemanticMemory`] via `with_reasoning`.
166/// All write operations are best-effort: `SQLite` errors are propagated as
167/// [`MemoryError`], Qdrant failures are logged and silently ignored.
168pub struct ReasoningMemory {
169    pool: DbPool,
170    /// Optional vector store for embedding-similarity retrieval.
171    ///
172    /// `None` when Qdrant is unavailable; falls back to returning empty results.
173    vector_store: Option<std::sync::Arc<dyn VectorStore>>,
174}
175
176/// Qdrant collection name used for reasoning-strategy embeddings.
177pub const REASONING_COLLECTION: &str = "reasoning_strategies";
178
179impl ReasoningMemory {
180    /// Create a new `ReasoningMemory` backed by the given `SQLite` pool.
181    ///
182    /// Pass `vector_store = Some(arc)` to enable embedding-similarity retrieval via Qdrant.
183    /// When `None`, [`Self::retrieve_by_embedding`] always returns an empty vec.
184    ///
185    /// # Examples
186    ///
187    /// ```no_run
188    /// use zeph_memory::reasoning::ReasoningMemory;
189    ///
190    /// async fn demo(pool: zeph_db::DbPool) {
191    ///     let memory = ReasoningMemory::new(pool, None);
192    /// }
193    /// ```
194    #[must_use]
195    pub fn new(pool: DbPool, vector_store: Option<std::sync::Arc<dyn VectorStore>>) -> Self {
196        Self { pool, vector_store }
197    }
198
199    /// Insert a new strategy into `SQLite`.
200    ///
201    /// When a `vector_store` is configured, the strategy is also upserted into
202    /// the Qdrant `reasoning_strategies` collection using the provided `embedding`.
203    /// Qdrant failures are logged at `warn` level and do not fail the insert.
204    ///
205    /// # Errors
206    ///
207    /// Returns an error if the `SQLite` insert fails.
208    #[tracing::instrument(name = "memory.reasoning.insert", skip(self, embedding), fields(id = %strategy.id))]
209    pub async fn insert(
210        &self,
211        strategy: &ReasoningStrategy,
212        embedding: Vec<f32>,
213    ) -> Result<(), MemoryError> {
214        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
215        let raw = format!(
216            "INSERT INTO reasoning_strategies \
217             (id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at) \
218             VALUES (?, ?, ?, ?, {epoch_now}, {epoch_now}, 0, NULL) \
219             ON CONFLICT (id) DO UPDATE SET \
220               summary = EXCLUDED.summary, \
221               outcome = EXCLUDED.outcome, \
222               task_hint = EXCLUDED.task_hint, \
223               last_used_at = EXCLUDED.last_used_at, \
224               embedded_at = EXCLUDED.embedded_at"
225        );
226        let sql = zeph_db::rewrite_placeholders(&raw);
227        zeph_db::query(&sql)
228            .bind(&strategy.id)
229            .bind(&strategy.summary)
230            .bind(strategy.outcome.as_str())
231            .bind(&strategy.task_hint)
232            .execute(&self.pool)
233            .await?;
234
235        // Qdrant upsert — best effort: SQLite row already written.
236        if let Some(ref vs) = self.vector_store {
237            let point = crate::vector_store::VectorPoint {
238                id: strategy.id.clone(),
239                vector: embedding,
240                payload: std::collections::HashMap::from([
241                    (
242                        "outcome".to_owned(),
243                        serde_json::Value::String(strategy.outcome.as_str().to_owned()),
244                    ),
245                    (
246                        "task_hint".to_owned(),
247                        serde_json::Value::String(strategy.task_hint.clone()),
248                    ),
249                ]),
250            };
251            if let Err(e) = vs.upsert(REASONING_COLLECTION, vec![point]).await {
252                tracing::warn!(error = %e, id = %strategy.id, "reasoning: Qdrant upsert failed — SQLite-only mode");
253            } else {
254                // Mark embedded_at on success.
255                let update_sql = zeph_db::rewrite_placeholders(&format!(
256                    "UPDATE reasoning_strategies SET embedded_at = {epoch_now} WHERE id = ?"
257                ));
258                if let Err(e) = zeph_db::query(&update_sql)
259                    .bind(&strategy.id)
260                    .execute(&self.pool)
261                    .await
262                {
263                    tracing::warn!(error = %e, "reasoning: failed to set embedded_at");
264                }
265            }
266        }
267
268        tracing::debug!(id = %strategy.id, outcome = strategy.outcome.as_str(), "reasoning: strategy inserted");
269        Ok(())
270    }
271
272    /// Retrieve up to `top_k` strategies by embedding similarity.
273    ///
274    /// This method is **pure** — it does not update `use_count` or `last_used_at`.
275    /// Call [`Self::mark_used`] with the ids of strategies actually injected into the
276    /// prompt (after budget truncation) to maintain accurate retrieval bookkeeping.
277    ///
278    /// Returns an empty vec when no vector store is configured.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the Qdrant search or `SQLite` fetch fails.
283    #[tracing::instrument(
284        name = "memory.reasoning.retrieve_by_embedding",
285        skip(self, embedding),
286        fields(top_k)
287    )]
288    pub async fn retrieve_by_embedding(
289        &self,
290        embedding: &[f32],
291        top_k: u64,
292    ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
293        let Some(ref vs) = self.vector_store else {
294            return Ok(Vec::new());
295        };
296
297        let scored = vs
298            .search(REASONING_COLLECTION, embedding.to_vec(), top_k, None)
299            .await?;
300
301        if scored.is_empty() {
302            return Ok(Vec::new());
303        }
304
305        let ids: Vec<String> = scored.into_iter().map(|p| p.id).collect();
306        self.fetch_by_ids(&ids).await
307    }
308
309    /// Increment `use_count` and update `last_used_at` for each id in the list.
310    ///
311    /// Safe to call with an empty slice — no SQL is issued.
312    /// The list is chunked into batches of [`MAX_IDS_PER_QUERY`] to respect `SQLite`'s
313    /// variable limit.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if the database update fails.
318    #[tracing::instrument(name = "memory.reasoning.mark_used", skip(self), fields(n = ids.len()))]
319    pub async fn mark_used(&self, ids: &[String]) -> Result<(), MemoryError> {
320        if ids.is_empty() {
321            return Ok(());
322        }
323
324        let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
325        for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
326            let ph = placeholder_list(1, chunk.len());
327            // Note: placeholder_list already generates ?1,?2,... (SQLite) or $1,$2,... (postgres).
328            // Do NOT call rewrite_placeholders here — that would corrupt ?1 into $11.
329            let sql = format!(
330                "UPDATE reasoning_strategies \
331                 SET use_count = use_count + 1, last_used_at = {epoch_now} \
332                 WHERE id IN ({ph})"
333            );
334            let mut q = zeph_db::query(&sql);
335            for id in chunk {
336                q = q.bind(id.as_str());
337            }
338            q.execute(&self.pool).await?;
339        }
340
341        Ok(())
342    }
343
344    /// Evict strategies when the table exceeds `store_limit`.
345    ///
346    /// **Normal path**: delete rows with `use_count <= HOT_STRATEGY_USE_COUNT`, oldest
347    /// first, until the table returns to `store_limit`.
348    ///
349    /// **Saturation path**: when the normal path deletes nothing AND the table exceeds
350    /// `2 × store_limit`, bypass hot-row protection and delete oldest rows regardless of
351    /// `use_count`. Emits a `warn!` with the eviction count so operators can tune
352    /// `store_limit` upward or lower the hot threshold.
353    ///
354    /// Returns the number of rows deleted.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if any database operation fails.
359    #[tracing::instrument(name = "memory.reasoning.evict_lru", skip(self), fields(store_limit))]
360    pub async fn evict_lru(&self, store_limit: usize) -> Result<usize, MemoryError> {
361        let count = self.count().await?;
362        if count <= store_limit {
363            return Ok(0);
364        }
365
366        let over_by = count - store_limit;
367        let deleted_cold = self.delete_oldest_cold(over_by).await?;
368        if deleted_cold > 0 {
369            // Also delete from Qdrant best-effort (ids not tracked here — full resync on recovery).
370            tracing::debug!(
371                deleted = deleted_cold,
372                count,
373                "reasoning: evicted cold strategies"
374            );
375            return Ok(deleted_cold);
376        }
377
378        // All rows over limit are hot. Check hard ceiling.
379        let hard_ceiling = store_limit.saturating_mul(2);
380        if count <= hard_ceiling {
381            tracing::debug!(
382                count,
383                store_limit,
384                "reasoning: hot saturation — growth allowed under 2x ceiling"
385            );
386            return Ok(0);
387        }
388
389        // Hard ceiling breached: force-evict oldest rows unconditionally.
390        let forced = count - store_limit;
391        let deleted_forced = self.delete_oldest_unconditional(forced).await?;
392        tracing::warn!(
393            deleted = deleted_forced,
394            count,
395            hard_ceiling,
396            "reasoning: hard-ceiling eviction — evicted hot strategies; consider raising store_limit"
397        );
398
399        Ok(deleted_forced)
400    }
401
402    /// Return the total number of rows in `reasoning_strategies`.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the database query fails.
407    pub async fn count(&self) -> Result<usize, MemoryError> {
408        let row: (i64,) = zeph_db::query_as("SELECT COUNT(*) FROM reasoning_strategies")
409            .fetch_one(&self.pool)
410            .await?;
411        Ok(usize::try_from(row.0.max(0)).unwrap_or(0))
412    }
413
414    // ── private helpers ───────────────────────────────────────────────────────
415
416    /// Fetch strategy rows by their ids in a single `WHERE id IN (...)` query.
417    pub(crate) async fn fetch_by_ids(
418        &self,
419        ids: &[String],
420    ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
421        if ids.is_empty() {
422            return Ok(Vec::new());
423        }
424
425        let mut strategies = Vec::with_capacity(ids.len());
426        for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
427            let ph = placeholder_list(1, chunk.len());
428            // Note: placeholder_list generates DB-specific ?N/$N syntax — do NOT rewite.
429            let sql = format!(
430                "SELECT id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at \
431                 FROM reasoning_strategies WHERE id IN ({ph})"
432            );
433            let mut q = zeph_db::query_as::<
434                _,
435                (String, String, String, String, i64, i64, i64, Option<i64>),
436            >(&sql);
437            for id in chunk {
438                q = q.bind(id.as_str());
439            }
440            let rows = q.fetch_all(&self.pool).await?;
441            for (
442                id,
443                summary,
444                outcome_str,
445                task_hint,
446                created_at,
447                last_used_at,
448                use_count,
449                embedded_at,
450            ) in rows
451            {
452                let outcome = Outcome::from_str(&outcome_str).unwrap_or(Outcome::Failure);
453                strategies.push(ReasoningStrategy {
454                    id,
455                    summary,
456                    outcome,
457                    task_hint,
458                    created_at,
459                    last_used_at,
460                    use_count,
461                    embedded_at,
462                });
463            }
464        }
465
466        Ok(strategies)
467    }
468
469    /// Delete up to `n` cold rows (`use_count <= HOT_STRATEGY_USE_COUNT`), oldest first.
470    ///
471    /// Returns the number of deleted rows.
472    async fn delete_oldest_cold(&self, n: usize) -> Result<usize, MemoryError> {
473        let limit = i64::try_from(n).unwrap_or(i64::MAX);
474        // Use plain `?` + rewrite_placeholders so postgres gets `$1`.
475        let raw = format!(
476            "DELETE FROM reasoning_strategies \
477             WHERE id IN ( \
478               SELECT id FROM reasoning_strategies \
479               WHERE use_count <= {HOT_STRATEGY_USE_COUNT} \
480               ORDER BY last_used_at ASC LIMIT ? \
481             )"
482        );
483        let sql = zeph_db::rewrite_placeholders(&raw);
484        let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
485        Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
486    }
487
488    /// Delete up to `n` rows unconditionally (oldest by `last_used_at`).
489    ///
490    /// Used only for the hard-ceiling saturation path.
491    async fn delete_oldest_unconditional(&self, n: usize) -> Result<usize, MemoryError> {
492        let limit = i64::try_from(n).unwrap_or(i64::MAX);
493        let raw = "DELETE FROM reasoning_strategies \
494                   WHERE id IN ( \
495                     SELECT id FROM reasoning_strategies \
496                     ORDER BY last_used_at ASC LIMIT ? \
497                   )";
498        let sql = zeph_db::rewrite_placeholders(raw);
499        let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
500        Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
501    }
502}
503
504// ── Free functions ────────────────────────────────────────────────────────────
505
506/// Run the self-judge step against a turn's message tail.
507///
508/// Sends the last `messages` slice to the LLM with the self-judge system prompt and
509/// attempts to parse the JSON response into a [`SelfJudgeOutcome`].
510///
511/// Returns `None` on parse failure, timeout, or LLM error — never propagates errors.
512/// Callers should log the `None` case at most at `debug` level.
513///
514/// # Examples
515///
516/// ```no_run
517/// use std::time::Duration;
518/// use zeph_llm::any::AnyProvider;
519/// use zeph_memory::reasoning::run_self_judge;
520///
521/// async fn demo(provider: AnyProvider, messages: &[zeph_llm::provider::Message]) {
522///     let outcome = run_self_judge(&provider, messages, Duration::from_secs(10)).await;
523///     if let Some(o) = outcome {
524///         println!("success={}, hint={}", o.success, o.task_hint);
525///     }
526/// }
527/// ```
528#[tracing::instrument(name = "memory.reasoning.self_judge", skip(provider, messages), fields(n = messages.len()))]
529pub async fn run_self_judge(
530    provider: &AnyProvider,
531    messages: &[Message],
532    extraction_timeout: Duration,
533) -> Option<SelfJudgeOutcome> {
534    if messages.is_empty() {
535        return None;
536    }
537
538    let user_prompt = build_transcript_prompt(messages);
539
540    let llm_messages = [
541        Message::from_legacy(Role::System, SELF_JUDGE_SYSTEM),
542        Message::from_legacy(Role::User, user_prompt),
543    ];
544
545    let response = match timeout(extraction_timeout, provider.chat(&llm_messages)).await {
546        Ok(Ok(text)) => text,
547        Ok(Err(e)) => {
548            tracing::warn!(error = %e, "reasoning: self-judge LLM call failed");
549            return None;
550        }
551        Err(_) => {
552            tracing::warn!("reasoning: self-judge timed out");
553            return None;
554        }
555    };
556
557    parse_self_judge_response(&response)
558}
559
560/// Run the distillation step.
561///
562/// Sends the reasoning chain and outcome label to the LLM and trims the response to
563/// at most 3 sentences and 512 characters.
564///
565/// Returns `None` on LLM error, timeout, or empty response.
566///
567/// # Examples
568///
569/// ```no_run
570/// use std::time::Duration;
571/// use zeph_llm::any::AnyProvider;
572/// use zeph_memory::reasoning::{Outcome, distill_strategy};
573///
574/// async fn demo(provider: AnyProvider) {
575///     let summary = distill_strategy(&provider, Outcome::Success, "tried X, worked", Duration::from_secs(10)).await;
576///     println!("{:?}", summary);
577/// }
578/// ```
579#[tracing::instrument(name = "memory.reasoning.distill", skip(provider, reasoning_chain))]
580pub async fn distill_strategy(
581    provider: &AnyProvider,
582    outcome: Outcome,
583    reasoning_chain: &str,
584    distill_timeout: Duration,
585) -> Option<String> {
586    if reasoning_chain.is_empty() {
587        return None;
588    }
589
590    let user_prompt = format!(
591        "Outcome: {}\n\nReasoning chain:\n{reasoning_chain}",
592        outcome.as_str()
593    );
594
595    let llm_messages = [
596        Message::from_legacy(Role::System, DISTILL_SYSTEM),
597        Message::from_legacy(Role::User, user_prompt),
598    ];
599
600    let response = match timeout(distill_timeout, provider.chat(&llm_messages)).await {
601        Ok(Ok(text)) => text,
602        Ok(Err(e)) => {
603            tracing::warn!(error = %e, "reasoning: distillation LLM call failed");
604            return None;
605        }
606        Err(_) => {
607            tracing::warn!("reasoning: distillation timed out");
608            return None;
609        }
610    };
611
612    let trimmed = trim_to_three_sentences(&response);
613    if trimmed.is_empty() {
614        None
615    } else {
616        Some(trimmed)
617    }
618}
619
620/// Configuration for the [`process_turn`] extraction pipeline.
621///
622/// Groups timeout and limit parameters that rarely change between turns.
623#[derive(Debug, Clone, Copy)]
624pub struct ProcessTurnConfig {
625    /// Maximum rows to retain in the `reasoning_strategies` table.
626    pub store_limit: usize,
627    /// Timeout for the self-judge LLM call.
628    pub extraction_timeout: Duration,
629    /// Timeout for the distillation LLM call.
630    pub distill_timeout: Duration,
631    /// Timeout for each `embed()` invocation in the pipeline. Default: 5 s.
632    pub embed_timeout: Duration,
633    /// Maximum number of recent messages sliced from the turn history before passing
634    /// to the self-judge evaluator. Narrowing the window prevents digest/recap messages
635    /// from prior sessions from confusing the classifier. Default: `2`.
636    pub self_judge_window: usize,
637    /// Minimum character count in the last assistant message to trigger self-judge.
638    /// Short or trivial responses (greetings, one-word answers) are skipped. Default: `50`.
639    pub min_assistant_chars: usize,
640}
641
642/// Run the full extraction pipeline for a single turn.
643///
644/// Calls [`run_self_judge`], then [`distill_strategy`], then inserts the result.
645/// `evict_lru` is called when the table exceeds `store_limit`. All errors are
646/// logged at `warn` level and the function returns `Ok(())` so callers never
647/// propagate pipeline failures.
648///
649/// # Errors
650///
651/// Returns an error if the embedding call fails, but not if self-judge or distillation fails.
652#[tracing::instrument(name = "memory.reasoning.process_turn", skip_all)]
653pub async fn process_turn(
654    memory: &ReasoningMemory,
655    extract_provider: &AnyProvider,
656    distill_provider: &AnyProvider,
657    embed_provider: &AnyProvider,
658    messages: &[Message],
659    cfg: ProcessTurnConfig,
660) -> Result<(), MemoryError> {
661    let ProcessTurnConfig {
662        store_limit,
663        extraction_timeout,
664        distill_timeout,
665        embed_timeout,
666        self_judge_window,
667        min_assistant_chars,
668    } = cfg;
669
670    // Narrow the message window to reduce noise from session digests and welcome-back
671    // messages that span prior sessions, which can confuse the self-judge classifier.
672    let judge_messages = if messages.len() > self_judge_window {
673        &messages[messages.len() - self_judge_window..]
674    } else {
675        messages
676    };
677
678    // Skip self-judge when the last assistant response is too short to be meaningful.
679    let last_assistant_chars = judge_messages
680        .iter()
681        .rev()
682        .find(|m| m.role == Role::Assistant)
683        .map_or(0, |m| m.content.len());
684    if last_assistant_chars < min_assistant_chars {
685        return Ok(());
686    }
687
688    let Some(outcome) = run_self_judge(extract_provider, judge_messages, extraction_timeout).await
689    else {
690        return Ok(());
691    };
692
693    let outcome_enum = if outcome.success {
694        Outcome::Success
695    } else {
696        Outcome::Failure
697    };
698
699    let Some(summary) = distill_strategy(
700        distill_provider,
701        outcome_enum,
702        &outcome.reasoning_chain,
703        distill_timeout,
704    )
705    .await
706    else {
707        return Ok(());
708    };
709
710    // Embed task_hint + summary for Qdrant retrieval (S2 from architect plan).
711    let embed_input = format!("{}\n{}", outcome.task_hint, summary);
712    let embedding =
713        match tokio::time::timeout(embed_timeout, embed_provider.embed(&embed_input)).await {
714            Ok(Ok(v)) => v,
715            Ok(Err(e)) => {
716                tracing::warn!(error = %e, "reasoning: embedding failed — strategy not stored");
717                return Ok(());
718            }
719            Err(_) => {
720                tracing::warn!("reasoning: embed timed out — strategy not stored");
721                return Ok(());
722            }
723        };
724
725    let id = uuid::Uuid::new_v4().to_string();
726    let strategy = ReasoningStrategy {
727        id,
728        summary,
729        outcome: outcome_enum,
730        task_hint: outcome.task_hint,
731        created_at: 0, // filled by SQL EPOCH_NOW
732        last_used_at: 0,
733        use_count: 0,
734        embedded_at: None,
735    };
736
737    // P2-2: check count before insert to skip the evict_lru SELECT+DELETE when not needed.
738    // If count is already at or above store_limit, evict after insert. Approximate: two
739    // concurrent inserts can both read the same count and both decide to evict — the
740    // evict_lru implementation is idempotent so over-eviction by ≤1 row is acceptable.
741    let count_before = memory.count().await.unwrap_or(0);
742
743    if let Err(e) = memory.insert(&strategy, embedding).await {
744        tracing::warn!(error = %e, "reasoning: insert failed");
745        return Ok(());
746    }
747
748    if count_before >= store_limit
749        && let Err(e) = memory.evict_lru(store_limit).await
750    {
751        tracing::warn!(error = %e, "reasoning: evict_lru failed");
752    }
753
754    Ok(())
755}
756
757// ── private helpers ───────────────────────────────────────────────────────────
758
759/// Maximum characters taken from a single message's content in the transcript prompt.
760///
761/// Prevents unbounded prompt growth when long tool outputs or code blocks are present
762/// in the turn history (S-Med2 fix).
763const MAX_TRANSCRIPT_MESSAGE_CHARS: usize = 2000;
764
765/// Build a turn transcript prompt from the message slice.
766///
767/// Each message's content is truncated to [`MAX_TRANSCRIPT_MESSAGE_CHARS`] to bound
768/// the prompt length regardless of tool-output size. Mirrors the
769/// `build_extraction_prompt` format in `trajectory.rs` for consistency.
770fn build_transcript_prompt(messages: &[Message]) -> String {
771    let mut prompt = String::from("Agent turn messages:\n");
772    for (i, msg) in messages.iter().enumerate() {
773        use std::fmt::Write as _;
774        let role = format!("{:?}", msg.role);
775        // Truncate at a char boundary to avoid invalid UTF-8 slices.
776        let content: std::borrow::Cow<str> =
777            if msg.content.chars().count() > MAX_TRANSCRIPT_MESSAGE_CHARS {
778                msg.content
779                    .char_indices()
780                    .nth(MAX_TRANSCRIPT_MESSAGE_CHARS)
781                    .map_or(msg.content.as_str().into(), |(byte_idx, _)| {
782                        msg.content[..byte_idx].into()
783                    })
784            } else {
785                msg.content.as_str().into()
786            };
787        let _ = writeln!(prompt, "[{}] {}: {}", i + 1, role, content);
788    }
789    prompt.push_str("\nEvaluate this turn and return JSON.");
790    prompt
791}
792
793/// Parse the LLM response from the self-judge step into a [`SelfJudgeOutcome`].
794///
795/// Strips markdown code fences, then tries direct parse; on failure, locates the
796/// outermost `{…}` brackets and tries again. Returns `None` on persistent parse failure.
797fn parse_self_judge_response(response: &str) -> Option<SelfJudgeOutcome> {
798    // Strip markdown fences (```json … ```)
799    let stripped = response
800        .trim()
801        .trim_start_matches("```json")
802        .trim_start_matches("```")
803        .trim_end_matches("```")
804        .trim();
805
806    if let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(stripped) {
807        return Some(v);
808    }
809
810    // Try to extract the first `{…}` span.
811    if let (Some(start), Some(end)) = (stripped.find('{'), stripped.rfind('}'))
812        && end > start
813        && let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(&stripped[start..=end])
814    {
815        return Some(v);
816    }
817
818    tracing::warn!(
819        "reasoning: failed to parse self-judge response (len={}): {:.200}",
820        response.len(),
821        response
822    );
823    None
824}
825
826/// Trim text to at most 3 sentences and 512 characters.
827///
828/// Sentence boundaries are detected by `.`, `!`, `?` followed by whitespace or end-of-string.
829/// The hard 512-char cap truncates at the nearest char boundary below the limit.
830fn trim_to_three_sentences(text: &str) -> String {
831    const MAX_CHARS: usize = 512;
832    const MAX_SENTENCES: usize = 3;
833
834    let text = text.trim();
835    let mut sentence_ends: Vec<usize> = Vec::new();
836    let chars: Vec<char> = text.chars().collect();
837    let len = chars.len();
838
839    for (i, &ch) in chars.iter().enumerate() {
840        if matches!(ch, '.' | '!' | '?') {
841            let next_is_boundary = i + 1 >= len || chars[i + 1].is_whitespace();
842            if next_is_boundary {
843                sentence_ends.push(i + 1); // exclusive byte position (chars)
844                if sentence_ends.len() >= MAX_SENTENCES {
845                    break;
846                }
847            }
848        }
849    }
850
851    let char_limit = if let Some(&end) = sentence_ends.last() {
852        end.min(MAX_CHARS)
853    } else {
854        text.chars().count().min(MAX_CHARS)
855    };
856
857    let result: String = text.chars().take(char_limit).collect();
858    // Hard cap on byte length (chars already limited, but enforce once more).
859    match result.char_indices().nth(MAX_CHARS) {
860        Some((byte_idx, _)) => result[..byte_idx].to_owned(),
861        None => result,
862    }
863}
864
865#[cfg(test)]
866mod tests {
867    use super::*;
868
869    // ── Outcome ────────────────────────────────────────────────────────────────
870
871    #[test]
872    fn outcome_as_str_round_trip() {
873        assert_eq!(Outcome::Success.as_str(), "success");
874        assert_eq!(Outcome::Failure.as_str(), "failure");
875    }
876
877    #[test]
878    fn outcome_from_str_success() {
879        assert_eq!(Outcome::from_str("success").unwrap(), Outcome::Success);
880    }
881
882    #[test]
883    fn outcome_from_str_failure() {
884        assert_eq!(Outcome::from_str("failure").unwrap(), Outcome::Failure);
885    }
886
887    #[test]
888    fn outcome_from_str_unknown_defaults_to_failure() {
889        // Unknown values silently map to Failure (forward-compatible).
890        assert_eq!(Outcome::from_str("partial").unwrap(), Outcome::Failure);
891    }
892
893    // ── parse_self_judge_response ─────────────────────────────────────────────
894
895    #[test]
896    fn parse_direct_json() {
897        let json = r#"{"success":true,"reasoning_chain":"tried X","task_hint":"do Y"}"#;
898        let outcome = parse_self_judge_response(json).unwrap();
899        assert!(outcome.success);
900        assert_eq!(outcome.reasoning_chain, "tried X");
901        assert_eq!(outcome.task_hint, "do Y");
902    }
903
904    #[test]
905    fn parse_json_with_markdown_fences() {
906        let response =
907            "```json\n{\"success\":false,\"reasoning_chain\":\"r\",\"task_hint\":\"t\"}\n```";
908        let outcome = parse_self_judge_response(response).unwrap();
909        assert!(!outcome.success);
910    }
911
912    #[test]
913    fn parse_json_embedded_in_prose() {
914        let response = r#"Here is the evaluation: {"success":true,"reasoning_chain":"chain","task_hint":"hint"} — done."#;
915        let outcome = parse_self_judge_response(response).unwrap();
916        assert!(outcome.success);
917    }
918
919    #[test]
920    fn parse_invalid_returns_none() {
921        let outcome = parse_self_judge_response("not json at all");
922        assert!(outcome.is_none());
923    }
924
925    // ── trim_to_three_sentences ───────────────────────────────────────────────
926
927    #[test]
928    fn trim_three_sentences_short_text() {
929        let text = "One. Two. Three.";
930        assert_eq!(trim_to_three_sentences(text), "One. Two. Three.");
931    }
932
933    #[test]
934    fn trim_three_sentences_truncates_at_third() {
935        let text = "One. Two. Three. Four. Five.";
936        let result = trim_to_three_sentences(text);
937        assert!(result.ends_with("Three."), "got: {result}");
938        assert!(!result.contains("Four"));
939    }
940
941    #[test]
942    fn trim_three_sentences_hard_cap() {
943        // 600 chars, no sentence boundaries → should be capped at 512 chars
944        let long: String = "x".repeat(600);
945        let result = trim_to_three_sentences(&long);
946        assert!(result.chars().count() <= 512);
947    }
948
949    #[test]
950    fn trim_three_sentences_empty() {
951        assert_eq!(trim_to_three_sentences("   "), "");
952    }
953
954    // ── ReasoningMemory (in-memory SQLite) ────────────────────────────────────
955
956    async fn make_test_pool() -> DbPool {
957        let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
958        sqlx::query(
959            "CREATE TABLE reasoning_strategies (
960                id           TEXT    PRIMARY KEY NOT NULL,
961                summary      TEXT    NOT NULL,
962                outcome      TEXT    NOT NULL,
963                task_hint    TEXT    NOT NULL,
964                created_at   INTEGER NOT NULL DEFAULT (unixepoch('now')),
965                last_used_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
966                use_count    INTEGER NOT NULL DEFAULT 0,
967                embedded_at  INTEGER
968            )",
969        )
970        .execute(&pool)
971        .await
972        .unwrap();
973        pool
974    }
975
976    fn make_strategy(id: &str) -> ReasoningStrategy {
977        ReasoningStrategy {
978            id: id.to_owned(),
979            summary: format!("Summary for {id}"),
980            outcome: Outcome::Success,
981            task_hint: format!("Task hint for {id}"),
982            created_at: 0,
983            last_used_at: 0,
984            use_count: 0,
985            embedded_at: None,
986        }
987    }
988
989    #[tokio::test]
990    async fn insert_and_fetch_by_ids() {
991        let pool = make_test_pool().await;
992        let mem = ReasoningMemory::new(pool, None);
993
994        let s = make_strategy("abc-123");
995        mem.insert(&s, vec![]).await.unwrap();
996
997        let rows = mem.fetch_by_ids(&["abc-123".to_owned()]).await.unwrap();
998        assert_eq!(rows.len(), 1);
999        assert_eq!(rows[0].id, "abc-123");
1000        assert_eq!(rows[0].outcome, Outcome::Success);
1001    }
1002
1003    #[tokio::test]
1004    async fn mark_used_increments_count() {
1005        let pool = make_test_pool().await;
1006        let mem = ReasoningMemory::new(pool, None);
1007
1008        let s = make_strategy("mark-1");
1009        mem.insert(&s, vec![]).await.unwrap();
1010        mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
1011        mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
1012
1013        let rows = mem.fetch_by_ids(&["mark-1".to_owned()]).await.unwrap();
1014        assert_eq!(rows[0].use_count, 2);
1015    }
1016
1017    #[tokio::test]
1018    async fn mark_used_empty_is_noop() {
1019        let pool = make_test_pool().await;
1020        let mem = ReasoningMemory::new(pool, None);
1021        // Should not panic or error on empty slice.
1022        mem.mark_used(&[]).await.unwrap();
1023    }
1024
1025    #[tokio::test]
1026    async fn count_returns_correct_total() {
1027        let pool = make_test_pool().await;
1028        let mem = ReasoningMemory::new(pool, None);
1029
1030        for i in 0..5 {
1031            mem.insert(&make_strategy(&format!("s{i}")), vec![])
1032                .await
1033                .unwrap();
1034        }
1035
1036        assert_eq!(mem.count().await.unwrap(), 5);
1037    }
1038
1039    #[tokio::test]
1040    async fn evict_lru_cold_rows() {
1041        let pool = make_test_pool().await;
1042        let mem = ReasoningMemory::new(pool, None);
1043
1044        // Insert 5 cold rows (use_count = 0 by default).
1045        for i in 0..5 {
1046            mem.insert(&make_strategy(&format!("cold-{i}")), vec![])
1047                .await
1048                .unwrap();
1049        }
1050
1051        // Store limit is 3 → should delete 2 oldest.
1052        let deleted = mem.evict_lru(3).await.unwrap();
1053        assert_eq!(deleted, 2);
1054        assert_eq!(mem.count().await.unwrap(), 3);
1055    }
1056
1057    #[tokio::test]
1058    async fn evict_lru_respects_hot_rows_under_ceiling() {
1059        let pool = make_test_pool().await;
1060        let mem = ReasoningMemory::new(pool.clone(), None);
1061
1062        // Insert 5 hot rows by manually setting use_count > HOT_STRATEGY_USE_COUNT.
1063        for i in 0..5 {
1064            let id = format!("hot-{i}");
1065            mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1066            // Mark used 11 times to make them hot.
1067            let ids: Vec<String> = (0..11).map(|_| id.clone()).collect();
1068            for chunk_ids in ids.chunks(1) {
1069                mem.mark_used(chunk_ids).await.unwrap();
1070            }
1071        }
1072
1073        // store_limit=3, count=5, all hot, 5 < 2*3=6 → under ceiling → no deletion.
1074        let deleted = mem.evict_lru(3).await.unwrap();
1075        assert_eq!(deleted, 0);
1076        assert_eq!(mem.count().await.unwrap(), 5);
1077    }
1078
1079    #[tokio::test]
1080    async fn evict_lru_hard_ceiling_forces_deletion() {
1081        let pool = make_test_pool().await;
1082        let mem = ReasoningMemory::new(pool.clone(), None);
1083
1084        // Insert 7 hot rows. store_limit=3, ceiling=6. 7 > 6 → forced eviction.
1085        for i in 0..7 {
1086            let id = format!("hot2-{i}");
1087            mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1088            // Make hot.
1089            for _ in 0..=HOT_STRATEGY_USE_COUNT {
1090                mem.mark_used(std::slice::from_ref(&id)).await.unwrap();
1091            }
1092        }
1093
1094        let deleted = mem.evict_lru(3).await.unwrap();
1095        assert!(deleted > 0, "expected forced deletion");
1096        let remaining = mem.count().await.unwrap();
1097        assert_eq!(remaining, 3, "should be trimmed to store_limit");
1098    }
1099
1100    #[tokio::test]
1101    async fn evict_lru_no_op_when_under_limit() {
1102        let pool = make_test_pool().await;
1103        let mem = ReasoningMemory::new(pool, None);
1104
1105        for i in 0..3 {
1106            mem.insert(&make_strategy(&format!("s{i}")), vec![])
1107                .await
1108                .unwrap();
1109        }
1110
1111        // store_limit=10 → count(3) ≤ 10 → no deletion.
1112        let deleted = mem.evict_lru(10).await.unwrap();
1113        assert_eq!(deleted, 0);
1114    }
1115
1116    // ── mark_used chunked path ────────────────────────────────────────────────
1117
1118    #[tokio::test]
1119    async fn mark_used_chunked_over_490_ids() {
1120        let pool = make_test_pool().await;
1121        let mem = ReasoningMemory::new(pool, None);
1122
1123        // Insert 500 strategies — exceeds MAX_IDS_PER_QUERY (490) forcing two SQL batches.
1124        for i in 0..500usize {
1125            mem.insert(&make_strategy(&format!("chunked-{i}")), vec![])
1126                .await
1127                .unwrap();
1128        }
1129
1130        let ids: Vec<String> = (0..500usize).map(|i| format!("chunked-{i}")).collect();
1131        mem.mark_used(&ids).await.unwrap();
1132
1133        // Spot-check: first and 491st should both have use_count == 1.
1134        let first = mem.fetch_by_ids(&[ids[0].clone()]).await.unwrap();
1135        let over_chunk = mem.fetch_by_ids(&[ids[490].clone()]).await.unwrap();
1136        assert_eq!(first[0].use_count, 1, "first id should have use_count = 1");
1137        assert_eq!(
1138            over_chunk[0].use_count, 1,
1139            "id past the chunk boundary should have use_count = 1"
1140        );
1141    }
1142
1143    // ── run_self_judge malformed response ─────────────────────────────────────
1144
1145    #[tokio::test]
1146    async fn run_self_judge_malformed_json_returns_none() {
1147        use zeph_llm::any::AnyProvider;
1148        use zeph_llm::mock::MockProvider;
1149
1150        // with_responses populates the one-shot queue; chat() returns this prose string.
1151        let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1152            "This is not JSON at all.".to_string(),
1153        ]));
1154        let msgs = vec![Message::from_legacy(Role::User, "hello")];
1155        let result = run_self_judge(&provider, &msgs, std::time::Duration::from_secs(5)).await;
1156        assert!(result.is_none(), "malformed LLM response must return None");
1157    }
1158
1159    // ── distill_strategy truncation ───────────────────────────────────────────
1160
1161    #[tokio::test]
1162    async fn distill_strategy_truncates_to_three_sentences() {
1163        use zeph_llm::any::AnyProvider;
1164        use zeph_llm::mock::MockProvider;
1165
1166        let long_response = "One. Two. Three. Four. Five.";
1167        let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1168            long_response.to_string(),
1169        ]));
1170        let result = distill_strategy(
1171            &provider,
1172            Outcome::Success,
1173            "chain here",
1174            std::time::Duration::from_secs(5),
1175        )
1176        .await
1177        .unwrap();
1178        assert!(result.ends_with("Three."), "got: {result}");
1179        assert!(
1180            !result.contains("Four"),
1181            "should not contain 4th sentence: {result}"
1182        );
1183    }
1184
1185    // ── process_turn smoke test ───────────────────────────────────────────────
1186
1187    #[tokio::test]
1188    async fn process_turn_with_empty_messages_is_noop() {
1189        use zeph_llm::any::AnyProvider;
1190        use zeph_llm::mock::MockProvider;
1191
1192        let pool = make_test_pool().await;
1193        let mem = ReasoningMemory::new(pool, None);
1194        // MockProvider returns "{}" which parse_self_judge_response will return None for
1195        // (missing required fields) → Ok(()) with zero inserts.
1196        let provider = AnyProvider::Mock(MockProvider::default());
1197        let cfg = ProcessTurnConfig {
1198            store_limit: 100,
1199            extraction_timeout: std::time::Duration::from_secs(1),
1200            distill_timeout: std::time::Duration::from_secs(1),
1201            embed_timeout: std::time::Duration::from_secs(5),
1202            self_judge_window: 2,
1203            min_assistant_chars: 0,
1204        };
1205        let result = process_turn(&mem, &provider, &provider, &provider, &[], cfg).await;
1206        assert!(
1207            result.is_ok(),
1208            "process_turn with empty messages must succeed"
1209        );
1210        assert_eq!(
1211            mem.count().await.unwrap(),
1212            0,
1213            "no strategies should be stored"
1214        );
1215    }
1216}