use std::str::FromStr;
use std::time::Duration;
use serde::Deserialize;
use tokio::time::timeout;
use zeph_db::{ActiveDialect, DbPool, placeholder_list};
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::{LlmProvider as _, Message, Role};
use crate::error::MemoryError;
use crate::vector_store::VectorStore;
const HOT_STRATEGY_USE_COUNT: i64 = 10;
const MAX_IDS_PER_QUERY: usize = 490;
const SELF_JUDGE_SYSTEM: &str = "\
You are a task outcome evaluator. Given an agent turn transcript, analyze the conversation and determine:
1. Did the agent successfully complete the user's request? (true/false)
2. Extract the key reasoning steps the agent took (reasoning chain).
3. Summarize the task in one sentence (task hint).
Respond ONLY with valid JSON, no markdown fences, no prose:
{\"success\": bool, \"reasoning_chain\": \"string\", \"task_hint\": \"string\"}";
const DISTILL_SYSTEM: &str = "\
You are a strategy distiller. Given a reasoning chain from an agent turn, distill it into \
a short generalizable strategy (at most 3 sentences) that could help an agent facing a similar \
task. Focus on the transferable principle, not the specific instance. \
Respond with the strategy text only — no headers, no lists, no markdown.";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
Success,
Failure,
}
impl Outcome {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Outcome::Success => "success",
Outcome::Failure => "failure",
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("unknown outcome: {0}")]
pub struct OutcomeParseError(String);
impl FromStr for Outcome {
type Err = OutcomeParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"success" => Ok(Outcome::Success),
"failure" => Ok(Outcome::Failure),
other => {
tracing::warn!(
value = other,
"reasoning: unknown outcome, defaulting to Failure"
);
Ok(Outcome::Failure)
}
}
}
}
#[derive(Debug, Clone)]
pub struct ReasoningStrategy {
pub id: String,
pub summary: String,
pub outcome: Outcome,
pub task_hint: String,
pub created_at: i64,
pub last_used_at: i64,
pub use_count: i64,
pub embedded_at: Option<i64>,
}
#[derive(Debug, Deserialize)]
pub struct SelfJudgeOutcome {
pub success: bool,
pub reasoning_chain: String,
pub task_hint: String,
}
pub struct ReasoningMemory {
pool: DbPool,
vector_store: Option<std::sync::Arc<dyn VectorStore>>,
}
pub const REASONING_COLLECTION: &str = "reasoning_strategies";
impl ReasoningMemory {
#[must_use]
pub fn new(pool: DbPool, vector_store: Option<std::sync::Arc<dyn VectorStore>>) -> Self {
Self { pool, vector_store }
}
#[tracing::instrument(name = "memory.reasoning.insert", skip(self, embedding), fields(id = %strategy.id))]
pub async fn insert(
&self,
strategy: &ReasoningStrategy,
embedding: Vec<f32>,
) -> Result<(), MemoryError> {
let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
let raw = format!(
"INSERT OR REPLACE INTO reasoning_strategies \
(id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at) \
VALUES (?, ?, ?, ?, {epoch_now}, {epoch_now}, 0, NULL)"
);
let sql = zeph_db::rewrite_placeholders(&raw);
zeph_db::query(&sql)
.bind(&strategy.id)
.bind(&strategy.summary)
.bind(strategy.outcome.as_str())
.bind(&strategy.task_hint)
.execute(&self.pool)
.await?;
if let Some(ref vs) = self.vector_store {
let point = crate::vector_store::VectorPoint {
id: strategy.id.clone(),
vector: embedding,
payload: std::collections::HashMap::from([
(
"outcome".to_owned(),
serde_json::Value::String(strategy.outcome.as_str().to_owned()),
),
(
"task_hint".to_owned(),
serde_json::Value::String(strategy.task_hint.clone()),
),
]),
};
if let Err(e) = vs.upsert(REASONING_COLLECTION, vec![point]).await {
tracing::warn!(error = %e, id = %strategy.id, "reasoning: Qdrant upsert failed — SQLite-only mode");
} else {
let update_sql = zeph_db::rewrite_placeholders(&format!(
"UPDATE reasoning_strategies SET embedded_at = {epoch_now} WHERE id = ?"
));
if let Err(e) = zeph_db::query(&update_sql)
.bind(&strategy.id)
.execute(&self.pool)
.await
{
tracing::warn!(error = %e, "reasoning: failed to set embedded_at");
}
}
}
tracing::debug!(id = %strategy.id, outcome = strategy.outcome.as_str(), "reasoning: strategy inserted");
Ok(())
}
#[tracing::instrument(
name = "memory.reasoning.retrieve_by_embedding",
skip(self, embedding),
fields(top_k)
)]
pub async fn retrieve_by_embedding(
&self,
embedding: &[f32],
top_k: u64,
) -> Result<Vec<ReasoningStrategy>, MemoryError> {
let Some(ref vs) = self.vector_store else {
return Ok(Vec::new());
};
let scored = vs
.search(REASONING_COLLECTION, embedding.to_vec(), top_k, None)
.await?;
if scored.is_empty() {
return Ok(Vec::new());
}
let ids: Vec<String> = scored.into_iter().map(|p| p.id).collect();
self.fetch_by_ids(&ids).await
}
#[tracing::instrument(name = "memory.reasoning.mark_used", skip(self), fields(n = ids.len()))]
pub async fn mark_used(&self, ids: &[String]) -> Result<(), MemoryError> {
if ids.is_empty() {
return Ok(());
}
let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
let ph = placeholder_list(1, chunk.len());
let sql = format!(
"UPDATE reasoning_strategies \
SET use_count = use_count + 1, last_used_at = {epoch_now} \
WHERE id IN ({ph})"
);
let mut q = zeph_db::query(&sql);
for id in chunk {
q = q.bind(id.as_str());
}
q.execute(&self.pool).await?;
}
Ok(())
}
#[tracing::instrument(name = "memory.reasoning.evict_lru", skip(self), fields(store_limit))]
pub async fn evict_lru(&self, store_limit: usize) -> Result<usize, MemoryError> {
let count = self.count().await?;
if count <= store_limit {
return Ok(0);
}
let over_by = count - store_limit;
let deleted_cold = self.delete_oldest_cold(over_by).await?;
if deleted_cold > 0 {
tracing::debug!(
deleted = deleted_cold,
count,
"reasoning: evicted cold strategies"
);
return Ok(deleted_cold);
}
let hard_ceiling = store_limit.saturating_mul(2);
if count <= hard_ceiling {
tracing::debug!(
count,
store_limit,
"reasoning: hot saturation — growth allowed under 2x ceiling"
);
return Ok(0);
}
let forced = count - store_limit;
let deleted_forced = self.delete_oldest_unconditional(forced).await?;
tracing::warn!(
deleted = deleted_forced,
count,
hard_ceiling,
"reasoning: hard-ceiling eviction — evicted hot strategies; consider raising store_limit"
);
Ok(deleted_forced)
}
pub async fn count(&self) -> Result<usize, MemoryError> {
let row: (i64,) = zeph_db::query_as("SELECT COUNT(*) FROM reasoning_strategies")
.fetch_one(&self.pool)
.await?;
Ok(usize::try_from(row.0.max(0)).unwrap_or(0))
}
pub(crate) async fn fetch_by_ids(
&self,
ids: &[String],
) -> Result<Vec<ReasoningStrategy>, MemoryError> {
if ids.is_empty() {
return Ok(Vec::new());
}
let mut strategies = Vec::with_capacity(ids.len());
for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
let ph = placeholder_list(1, chunk.len());
let sql = format!(
"SELECT id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at \
FROM reasoning_strategies WHERE id IN ({ph})"
);
let mut q = zeph_db::query_as::<
_,
(String, String, String, String, i64, i64, i64, Option<i64>),
>(&sql);
for id in chunk {
q = q.bind(id.as_str());
}
let rows = q.fetch_all(&self.pool).await?;
for (
id,
summary,
outcome_str,
task_hint,
created_at,
last_used_at,
use_count,
embedded_at,
) in rows
{
let outcome = Outcome::from_str(&outcome_str).unwrap_or(Outcome::Failure);
strategies.push(ReasoningStrategy {
id,
summary,
outcome,
task_hint,
created_at,
last_used_at,
use_count,
embedded_at,
});
}
}
Ok(strategies)
}
async fn delete_oldest_cold(&self, n: usize) -> Result<usize, MemoryError> {
let limit = i64::try_from(n).unwrap_or(i64::MAX);
let raw = format!(
"DELETE FROM reasoning_strategies \
WHERE id IN ( \
SELECT id FROM reasoning_strategies \
WHERE use_count <= {HOT_STRATEGY_USE_COUNT} \
ORDER BY last_used_at ASC LIMIT ? \
)"
);
let sql = zeph_db::rewrite_placeholders(&raw);
let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
}
async fn delete_oldest_unconditional(&self, n: usize) -> Result<usize, MemoryError> {
let limit = i64::try_from(n).unwrap_or(i64::MAX);
let raw = "DELETE FROM reasoning_strategies \
WHERE id IN ( \
SELECT id FROM reasoning_strategies \
ORDER BY last_used_at ASC LIMIT ? \
)";
let sql = zeph_db::rewrite_placeholders(raw);
let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
}
}
#[tracing::instrument(name = "memory.reasoning.self_judge", skip(provider, messages), fields(n = messages.len()))]
pub async fn run_self_judge(
provider: &AnyProvider,
messages: &[Message],
extraction_timeout: Duration,
) -> Option<SelfJudgeOutcome> {
if messages.is_empty() {
return None;
}
let user_prompt = build_transcript_prompt(messages);
let llm_messages = [
Message::from_legacy(Role::System, SELF_JUDGE_SYSTEM),
Message::from_legacy(Role::User, user_prompt),
];
let response = match timeout(extraction_timeout, provider.chat(&llm_messages)).await {
Ok(Ok(text)) => text,
Ok(Err(e)) => {
tracing::warn!(error = %e, "reasoning: self-judge LLM call failed");
return None;
}
Err(_) => {
tracing::warn!("reasoning: self-judge timed out");
return None;
}
};
parse_self_judge_response(&response)
}
#[tracing::instrument(name = "memory.reasoning.distill", skip(provider, reasoning_chain))]
pub async fn distill_strategy(
provider: &AnyProvider,
outcome: Outcome,
reasoning_chain: &str,
distill_timeout: Duration,
) -> Option<String> {
if reasoning_chain.is_empty() {
return None;
}
let user_prompt = format!(
"Outcome: {}\n\nReasoning chain:\n{reasoning_chain}",
outcome.as_str()
);
let llm_messages = [
Message::from_legacy(Role::System, DISTILL_SYSTEM),
Message::from_legacy(Role::User, user_prompt),
];
let response = match timeout(distill_timeout, provider.chat(&llm_messages)).await {
Ok(Ok(text)) => text,
Ok(Err(e)) => {
tracing::warn!(error = %e, "reasoning: distillation LLM call failed");
return None;
}
Err(_) => {
tracing::warn!("reasoning: distillation timed out");
return None;
}
};
let trimmed = trim_to_three_sentences(&response);
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
}
#[derive(Debug, Clone, Copy)]
pub struct ProcessTurnConfig {
pub store_limit: usize,
pub extraction_timeout: Duration,
pub distill_timeout: Duration,
pub self_judge_window: usize,
pub min_assistant_chars: usize,
}
#[tracing::instrument(name = "memory.reasoning.process_turn", skip_all)]
pub async fn process_turn(
memory: &ReasoningMemory,
extract_provider: &AnyProvider,
distill_provider: &AnyProvider,
embed_provider: &AnyProvider,
messages: &[Message],
cfg: ProcessTurnConfig,
) -> Result<(), MemoryError> {
let ProcessTurnConfig {
store_limit,
extraction_timeout,
distill_timeout,
self_judge_window,
min_assistant_chars,
} = cfg;
let judge_messages = if messages.len() > self_judge_window {
&messages[messages.len() - self_judge_window..]
} else {
messages
};
let last_assistant_chars = judge_messages
.iter()
.rev()
.find(|m| m.role == Role::Assistant)
.map_or(0, |m| m.content.len());
if last_assistant_chars < min_assistant_chars {
return Ok(());
}
let Some(outcome) = run_self_judge(extract_provider, judge_messages, extraction_timeout).await
else {
return Ok(());
};
let outcome_enum = if outcome.success {
Outcome::Success
} else {
Outcome::Failure
};
let Some(summary) = distill_strategy(
distill_provider,
outcome_enum,
&outcome.reasoning_chain,
distill_timeout,
)
.await
else {
return Ok(());
};
let embed_input = format!("{}\n{}", outcome.task_hint, summary);
let embedding = match embed_provider.embed(&embed_input).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "reasoning: embedding failed — strategy not stored");
return Ok(());
}
};
let id = uuid::Uuid::new_v4().to_string();
let strategy = ReasoningStrategy {
id,
summary,
outcome: outcome_enum,
task_hint: outcome.task_hint,
created_at: 0, last_used_at: 0,
use_count: 0,
embedded_at: None,
};
let count_before = memory.count().await.unwrap_or(0);
if let Err(e) = memory.insert(&strategy, embedding).await {
tracing::warn!(error = %e, "reasoning: insert failed");
return Ok(());
}
if count_before >= store_limit
&& let Err(e) = memory.evict_lru(store_limit).await
{
tracing::warn!(error = %e, "reasoning: evict_lru failed");
}
Ok(())
}
const MAX_TRANSCRIPT_MESSAGE_CHARS: usize = 2000;
fn build_transcript_prompt(messages: &[Message]) -> String {
let mut prompt = String::from("Agent turn messages:\n");
for (i, msg) in messages.iter().enumerate() {
use std::fmt::Write as _;
let role = format!("{:?}", msg.role);
let content: std::borrow::Cow<str> =
if msg.content.chars().count() > MAX_TRANSCRIPT_MESSAGE_CHARS {
msg.content
.char_indices()
.nth(MAX_TRANSCRIPT_MESSAGE_CHARS)
.map_or(msg.content.as_str().into(), |(byte_idx, _)| {
msg.content[..byte_idx].into()
})
} else {
msg.content.as_str().into()
};
let _ = writeln!(prompt, "[{}] {}: {}", i + 1, role, content);
}
prompt.push_str("\nEvaluate this turn and return JSON.");
prompt
}
fn parse_self_judge_response(response: &str) -> Option<SelfJudgeOutcome> {
let stripped = response
.trim()
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
if let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(stripped) {
return Some(v);
}
if let (Some(start), Some(end)) = (stripped.find('{'), stripped.rfind('}'))
&& end > start
&& let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(&stripped[start..=end])
{
return Some(v);
}
tracing::warn!(
"reasoning: failed to parse self-judge response (len={}): {:.200}",
response.len(),
response
);
None
}
fn trim_to_three_sentences(text: &str) -> String {
const MAX_CHARS: usize = 512;
const MAX_SENTENCES: usize = 3;
let text = text.trim();
let mut sentence_ends: Vec<usize> = Vec::new();
let chars: Vec<char> = text.chars().collect();
let len = chars.len();
for (i, &ch) in chars.iter().enumerate() {
if matches!(ch, '.' | '!' | '?') {
let next_is_boundary = i + 1 >= len || chars[i + 1].is_whitespace();
if next_is_boundary {
sentence_ends.push(i + 1); if sentence_ends.len() >= MAX_SENTENCES {
break;
}
}
}
}
let char_limit = if let Some(&end) = sentence_ends.last() {
end.min(MAX_CHARS)
} else {
text.chars().count().min(MAX_CHARS)
};
let result: String = text.chars().take(char_limit).collect();
match result.char_indices().nth(MAX_CHARS) {
Some((byte_idx, _)) => result[..byte_idx].to_owned(),
None => result,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn outcome_as_str_round_trip() {
assert_eq!(Outcome::Success.as_str(), "success");
assert_eq!(Outcome::Failure.as_str(), "failure");
}
#[test]
fn outcome_from_str_success() {
assert_eq!(Outcome::from_str("success").unwrap(), Outcome::Success);
}
#[test]
fn outcome_from_str_failure() {
assert_eq!(Outcome::from_str("failure").unwrap(), Outcome::Failure);
}
#[test]
fn outcome_from_str_unknown_defaults_to_failure() {
assert_eq!(Outcome::from_str("partial").unwrap(), Outcome::Failure);
}
#[test]
fn parse_direct_json() {
let json = r#"{"success":true,"reasoning_chain":"tried X","task_hint":"do Y"}"#;
let outcome = parse_self_judge_response(json).unwrap();
assert!(outcome.success);
assert_eq!(outcome.reasoning_chain, "tried X");
assert_eq!(outcome.task_hint, "do Y");
}
#[test]
fn parse_json_with_markdown_fences() {
let response =
"```json\n{\"success\":false,\"reasoning_chain\":\"r\",\"task_hint\":\"t\"}\n```";
let outcome = parse_self_judge_response(response).unwrap();
assert!(!outcome.success);
}
#[test]
fn parse_json_embedded_in_prose() {
let response = r#"Here is the evaluation: {"success":true,"reasoning_chain":"chain","task_hint":"hint"} — done."#;
let outcome = parse_self_judge_response(response).unwrap();
assert!(outcome.success);
}
#[test]
fn parse_invalid_returns_none() {
let outcome = parse_self_judge_response("not json at all");
assert!(outcome.is_none());
}
#[test]
fn trim_three_sentences_short_text() {
let text = "One. Two. Three.";
assert_eq!(trim_to_three_sentences(text), "One. Two. Three.");
}
#[test]
fn trim_three_sentences_truncates_at_third() {
let text = "One. Two. Three. Four. Five.";
let result = trim_to_three_sentences(text);
assert!(result.ends_with("Three."), "got: {result}");
assert!(!result.contains("Four"));
}
#[test]
fn trim_three_sentences_hard_cap() {
let long: String = "x".repeat(600);
let result = trim_to_three_sentences(&long);
assert!(result.chars().count() <= 512);
}
#[test]
fn trim_three_sentences_empty() {
assert_eq!(trim_to_three_sentences(" "), "");
}
async fn make_test_pool() -> DbPool {
let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
sqlx::query(
"CREATE TABLE reasoning_strategies (
id TEXT PRIMARY KEY NOT NULL,
summary TEXT NOT NULL,
outcome TEXT NOT NULL,
task_hint TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
last_used_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
use_count INTEGER NOT NULL DEFAULT 0,
embedded_at INTEGER
)",
)
.execute(&pool)
.await
.unwrap();
pool
}
fn make_strategy(id: &str) -> ReasoningStrategy {
ReasoningStrategy {
id: id.to_owned(),
summary: format!("Summary for {id}"),
outcome: Outcome::Success,
task_hint: format!("Task hint for {id}"),
created_at: 0,
last_used_at: 0,
use_count: 0,
embedded_at: None,
}
}
#[tokio::test]
async fn insert_and_fetch_by_ids() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
let s = make_strategy("abc-123");
mem.insert(&s, vec![]).await.unwrap();
let rows = mem.fetch_by_ids(&["abc-123".to_owned()]).await.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].id, "abc-123");
assert_eq!(rows[0].outcome, Outcome::Success);
}
#[tokio::test]
async fn mark_used_increments_count() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
let s = make_strategy("mark-1");
mem.insert(&s, vec![]).await.unwrap();
mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
let rows = mem.fetch_by_ids(&["mark-1".to_owned()]).await.unwrap();
assert_eq!(rows[0].use_count, 2);
}
#[tokio::test]
async fn mark_used_empty_is_noop() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
mem.mark_used(&[]).await.unwrap();
}
#[tokio::test]
async fn count_returns_correct_total() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
for i in 0..5 {
mem.insert(&make_strategy(&format!("s{i}")), vec![])
.await
.unwrap();
}
assert_eq!(mem.count().await.unwrap(), 5);
}
#[tokio::test]
async fn evict_lru_cold_rows() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
for i in 0..5 {
mem.insert(&make_strategy(&format!("cold-{i}")), vec![])
.await
.unwrap();
}
let deleted = mem.evict_lru(3).await.unwrap();
assert_eq!(deleted, 2);
assert_eq!(mem.count().await.unwrap(), 3);
}
#[tokio::test]
async fn evict_lru_respects_hot_rows_under_ceiling() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool.clone(), None);
for i in 0..5 {
let id = format!("hot-{i}");
mem.insert(&make_strategy(&id), vec![]).await.unwrap();
let ids: Vec<String> = (0..11).map(|_| id.clone()).collect();
for chunk_ids in ids.chunks(1) {
mem.mark_used(chunk_ids).await.unwrap();
}
}
let deleted = mem.evict_lru(3).await.unwrap();
assert_eq!(deleted, 0);
assert_eq!(mem.count().await.unwrap(), 5);
}
#[tokio::test]
async fn evict_lru_hard_ceiling_forces_deletion() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool.clone(), None);
for i in 0..7 {
let id = format!("hot2-{i}");
mem.insert(&make_strategy(&id), vec![]).await.unwrap();
for _ in 0..=HOT_STRATEGY_USE_COUNT {
mem.mark_used(std::slice::from_ref(&id)).await.unwrap();
}
}
let deleted = mem.evict_lru(3).await.unwrap();
assert!(deleted > 0, "expected forced deletion");
let remaining = mem.count().await.unwrap();
assert_eq!(remaining, 3, "should be trimmed to store_limit");
}
#[tokio::test]
async fn evict_lru_no_op_when_under_limit() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
for i in 0..3 {
mem.insert(&make_strategy(&format!("s{i}")), vec![])
.await
.unwrap();
}
let deleted = mem.evict_lru(10).await.unwrap();
assert_eq!(deleted, 0);
}
#[tokio::test]
async fn mark_used_chunked_over_490_ids() {
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
for i in 0..500usize {
mem.insert(&make_strategy(&format!("chunked-{i}")), vec![])
.await
.unwrap();
}
let ids: Vec<String> = (0..500usize).map(|i| format!("chunked-{i}")).collect();
mem.mark_used(&ids).await.unwrap();
let first = mem.fetch_by_ids(&[ids[0].clone()]).await.unwrap();
let over_chunk = mem.fetch_by_ids(&[ids[490].clone()]).await.unwrap();
assert_eq!(first[0].use_count, 1, "first id should have use_count = 1");
assert_eq!(
over_chunk[0].use_count, 1,
"id past the chunk boundary should have use_count = 1"
);
}
#[tokio::test]
async fn run_self_judge_malformed_json_returns_none() {
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
"This is not JSON at all.".to_string(),
]));
let msgs = vec![Message::from_legacy(Role::User, "hello")];
let result = run_self_judge(&provider, &msgs, std::time::Duration::from_secs(5)).await;
assert!(result.is_none(), "malformed LLM response must return None");
}
#[tokio::test]
async fn distill_strategy_truncates_to_three_sentences() {
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
let long_response = "One. Two. Three. Four. Five.";
let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
long_response.to_string(),
]));
let result = distill_strategy(
&provider,
Outcome::Success,
"chain here",
std::time::Duration::from_secs(5),
)
.await
.unwrap();
assert!(result.ends_with("Three."), "got: {result}");
assert!(
!result.contains("Four"),
"should not contain 4th sentence: {result}"
);
}
#[tokio::test]
async fn process_turn_with_empty_messages_is_noop() {
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
let pool = make_test_pool().await;
let mem = ReasoningMemory::new(pool, None);
let provider = AnyProvider::Mock(MockProvider::default());
let cfg = ProcessTurnConfig {
store_limit: 100,
extraction_timeout: std::time::Duration::from_secs(1),
distill_timeout: std::time::Duration::from_secs(1),
self_judge_window: 2,
min_assistant_chars: 0,
};
let result = process_turn(&mem, &provider, &provider, &provider, &[], cfg).await;
assert!(
result.is_ok(),
"process_turn with empty messages must succeed"
);
assert_eq!(
mem.count().await.unwrap(),
0,
"no strategies should be stored"
);
}
}