1use std::str::FromStr;
34use std::time::Duration;
35
36use serde::Deserialize;
37use tokio::time::timeout;
38use zeph_db::{ActiveDialect, DbPool, placeholder_list};
39use zeph_llm::any::AnyProvider;
40use zeph_llm::provider::{LlmProvider as _, Message, Role};
41
42use crate::error::MemoryError;
43use crate::vector_store::VectorStore;
44
45const HOT_STRATEGY_USE_COUNT: i64 = 10;
50
51const MAX_IDS_PER_QUERY: usize = 490;
53
54const SELF_JUDGE_SYSTEM: &str = "\
59You are a task outcome evaluator. Given an agent turn transcript, analyze the conversation and determine:
601. Did the agent successfully complete the user's request? (true/false)
612. Extract the key reasoning steps the agent took (reasoning chain).
623. Summarize the task in one sentence (task hint).
63
64Respond ONLY with valid JSON, no markdown fences, no prose:
65{\"success\": bool, \"reasoning_chain\": \"string\", \"task_hint\": \"string\"}";
66
67const DISTILL_SYSTEM: &str = "\
71You are a strategy distiller. Given a reasoning chain from an agent turn, distill it into \
72a short generalizable strategy (at most 3 sentences) that could help an agent facing a similar \
73task. Focus on the transferable principle, not the specific instance. \
74Respond with the strategy text only — no headers, no lists, no markdown.";
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80#[non_exhaustive]
81pub enum Outcome {
82 Success,
84 Failure,
86}
87
88impl Outcome {
89 #[must_use]
91 pub fn as_str(self) -> &'static str {
92 match self {
93 Outcome::Success => "success",
94 Outcome::Failure => "failure",
95 }
96 }
97}
98
99#[derive(Debug, thiserror::Error)]
101#[error("unknown outcome: {0}")]
102pub struct OutcomeParseError(String);
103
104impl FromStr for Outcome {
105 type Err = OutcomeParseError;
106
107 fn from_str(s: &str) -> Result<Self, Self::Err> {
108 match s {
109 "success" => Ok(Outcome::Success),
110 "failure" => Ok(Outcome::Failure),
111 other => {
112 tracing::warn!(
113 value = other,
114 "reasoning: unknown outcome, defaulting to Failure"
115 );
116 Ok(Outcome::Failure)
117 }
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
127pub struct ReasoningStrategy {
128 pub id: String,
130 pub summary: String,
132 pub outcome: Outcome,
134 pub task_hint: String,
136 pub created_at: i64,
138 pub last_used_at: i64,
140 pub use_count: i64,
142 pub embedded_at: Option<i64>,
146}
147
148#[derive(Debug, Deserialize)]
154pub struct SelfJudgeOutcome {
155 pub success: bool,
157 pub reasoning_chain: String,
159 pub task_hint: String,
161}
162
163pub struct ReasoningMemory {
169 pool: DbPool,
170 vector_store: Option<std::sync::Arc<dyn VectorStore>>,
174}
175
176pub const REASONING_COLLECTION: &str = "reasoning_strategies";
178
179impl ReasoningMemory {
180 #[must_use]
195 pub fn new(pool: DbPool, vector_store: Option<std::sync::Arc<dyn VectorStore>>) -> Self {
196 Self { pool, vector_store }
197 }
198
199 #[tracing::instrument(name = "memory.reasoning.insert", skip(self, embedding), fields(id = %strategy.id))]
209 pub async fn insert(
210 &self,
211 strategy: &ReasoningStrategy,
212 embedding: Vec<f32>,
213 ) -> Result<(), MemoryError> {
214 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
215 let raw = format!(
216 "INSERT INTO reasoning_strategies \
217 (id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at) \
218 VALUES (?, ?, ?, ?, {epoch_now}, {epoch_now}, 0, NULL) \
219 ON CONFLICT (id) DO UPDATE SET \
220 summary = EXCLUDED.summary, \
221 outcome = EXCLUDED.outcome, \
222 task_hint = EXCLUDED.task_hint, \
223 last_used_at = EXCLUDED.last_used_at, \
224 embedded_at = EXCLUDED.embedded_at"
225 );
226 let sql = zeph_db::rewrite_placeholders(&raw);
227 zeph_db::query(&sql)
228 .bind(&strategy.id)
229 .bind(&strategy.summary)
230 .bind(strategy.outcome.as_str())
231 .bind(&strategy.task_hint)
232 .execute(&self.pool)
233 .await?;
234
235 if let Some(ref vs) = self.vector_store {
237 let point = crate::vector_store::VectorPoint {
238 id: strategy.id.clone(),
239 vector: embedding,
240 payload: std::collections::HashMap::from([
241 (
242 "outcome".to_owned(),
243 serde_json::Value::String(strategy.outcome.as_str().to_owned()),
244 ),
245 (
246 "task_hint".to_owned(),
247 serde_json::Value::String(strategy.task_hint.clone()),
248 ),
249 ]),
250 };
251 if let Err(e) = vs.upsert(REASONING_COLLECTION, vec![point]).await {
252 tracing::warn!(error = %e, id = %strategy.id, "reasoning: Qdrant upsert failed — SQLite-only mode");
253 } else {
254 let update_sql = zeph_db::rewrite_placeholders(&format!(
256 "UPDATE reasoning_strategies SET embedded_at = {epoch_now} WHERE id = ?"
257 ));
258 if let Err(e) = zeph_db::query(&update_sql)
259 .bind(&strategy.id)
260 .execute(&self.pool)
261 .await
262 {
263 tracing::warn!(error = %e, "reasoning: failed to set embedded_at");
264 }
265 }
266 }
267
268 tracing::debug!(id = %strategy.id, outcome = strategy.outcome.as_str(), "reasoning: strategy inserted");
269 Ok(())
270 }
271
272 #[tracing::instrument(
284 name = "memory.reasoning.retrieve_by_embedding",
285 skip(self, embedding),
286 fields(top_k)
287 )]
288 pub async fn retrieve_by_embedding(
289 &self,
290 embedding: &[f32],
291 top_k: u64,
292 ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
293 let Some(ref vs) = self.vector_store else {
294 return Ok(Vec::new());
295 };
296
297 let scored = vs
298 .search(REASONING_COLLECTION, embedding.to_vec(), top_k, None)
299 .await?;
300
301 if scored.is_empty() {
302 return Ok(Vec::new());
303 }
304
305 let ids: Vec<String> = scored.into_iter().map(|p| p.id).collect();
306 self.fetch_by_ids(&ids).await
307 }
308
309 #[tracing::instrument(name = "memory.reasoning.mark_used", skip(self), fields(n = ids.len()))]
319 pub async fn mark_used(&self, ids: &[String]) -> Result<(), MemoryError> {
320 if ids.is_empty() {
321 return Ok(());
322 }
323
324 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
325 for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
326 let ph = placeholder_list(1, chunk.len());
327 let sql = format!(
330 "UPDATE reasoning_strategies \
331 SET use_count = use_count + 1, last_used_at = {epoch_now} \
332 WHERE id IN ({ph})"
333 );
334 let mut q = zeph_db::query(&sql);
335 for id in chunk {
336 q = q.bind(id.as_str());
337 }
338 q.execute(&self.pool).await?;
339 }
340
341 Ok(())
342 }
343
344 #[tracing::instrument(name = "memory.reasoning.evict_lru", skip(self), fields(store_limit))]
360 pub async fn evict_lru(&self, store_limit: usize) -> Result<usize, MemoryError> {
361 let count = self.count().await?;
362 if count <= store_limit {
363 return Ok(0);
364 }
365
366 let over_by = count - store_limit;
367 let deleted_cold = self.delete_oldest_cold(over_by).await?;
368 if deleted_cold > 0 {
369 tracing::debug!(
371 deleted = deleted_cold,
372 count,
373 "reasoning: evicted cold strategies"
374 );
375 return Ok(deleted_cold);
376 }
377
378 let hard_ceiling = store_limit.saturating_mul(2);
380 if count <= hard_ceiling {
381 tracing::debug!(
382 count,
383 store_limit,
384 "reasoning: hot saturation — growth allowed under 2x ceiling"
385 );
386 return Ok(0);
387 }
388
389 let forced = count - store_limit;
391 let deleted_forced = self.delete_oldest_unconditional(forced).await?;
392 tracing::warn!(
393 deleted = deleted_forced,
394 count,
395 hard_ceiling,
396 "reasoning: hard-ceiling eviction — evicted hot strategies; consider raising store_limit"
397 );
398
399 Ok(deleted_forced)
400 }
401
402 pub async fn count(&self) -> Result<usize, MemoryError> {
408 let row: (i64,) = zeph_db::query_as("SELECT COUNT(*) FROM reasoning_strategies")
409 .fetch_one(&self.pool)
410 .await?;
411 Ok(usize::try_from(row.0.max(0)).unwrap_or(0))
412 }
413
414 pub(crate) async fn fetch_by_ids(
418 &self,
419 ids: &[String],
420 ) -> Result<Vec<ReasoningStrategy>, MemoryError> {
421 if ids.is_empty() {
422 return Ok(Vec::new());
423 }
424
425 let mut strategies = Vec::with_capacity(ids.len());
426 for chunk in ids.chunks(MAX_IDS_PER_QUERY) {
427 let ph = placeholder_list(1, chunk.len());
428 let sql = format!(
430 "SELECT id, summary, outcome, task_hint, created_at, last_used_at, use_count, embedded_at \
431 FROM reasoning_strategies WHERE id IN ({ph})"
432 );
433 let mut q = zeph_db::query_as::<
434 _,
435 (String, String, String, String, i64, i64, i64, Option<i64>),
436 >(&sql);
437 for id in chunk {
438 q = q.bind(id.as_str());
439 }
440 let rows = q.fetch_all(&self.pool).await?;
441 for (
442 id,
443 summary,
444 outcome_str,
445 task_hint,
446 created_at,
447 last_used_at,
448 use_count,
449 embedded_at,
450 ) in rows
451 {
452 let outcome = Outcome::from_str(&outcome_str).unwrap_or(Outcome::Failure);
453 strategies.push(ReasoningStrategy {
454 id,
455 summary,
456 outcome,
457 task_hint,
458 created_at,
459 last_used_at,
460 use_count,
461 embedded_at,
462 });
463 }
464 }
465
466 Ok(strategies)
467 }
468
469 async fn delete_oldest_cold(&self, n: usize) -> Result<usize, MemoryError> {
473 let limit = i64::try_from(n).unwrap_or(i64::MAX);
474 let raw = format!(
476 "DELETE FROM reasoning_strategies \
477 WHERE id IN ( \
478 SELECT id FROM reasoning_strategies \
479 WHERE use_count <= {HOT_STRATEGY_USE_COUNT} \
480 ORDER BY last_used_at ASC LIMIT ? \
481 )"
482 );
483 let sql = zeph_db::rewrite_placeholders(&raw);
484 let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
485 Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
486 }
487
488 async fn delete_oldest_unconditional(&self, n: usize) -> Result<usize, MemoryError> {
492 let limit = i64::try_from(n).unwrap_or(i64::MAX);
493 let raw = "DELETE FROM reasoning_strategies \
494 WHERE id IN ( \
495 SELECT id FROM reasoning_strategies \
496 ORDER BY last_used_at ASC LIMIT ? \
497 )";
498 let sql = zeph_db::rewrite_placeholders(raw);
499 let result = zeph_db::query(&sql).bind(limit).execute(&self.pool).await?;
500 Ok(usize::try_from(result.rows_affected()).unwrap_or(0))
501 }
502}
503
504#[tracing::instrument(name = "memory.reasoning.self_judge", skip(provider, messages), fields(n = messages.len()))]
529pub async fn run_self_judge(
530 provider: &AnyProvider,
531 messages: &[Message],
532 extraction_timeout: Duration,
533) -> Option<SelfJudgeOutcome> {
534 if messages.is_empty() {
535 return None;
536 }
537
538 let user_prompt = build_transcript_prompt(messages);
539
540 let llm_messages = [
541 Message::from_legacy(Role::System, SELF_JUDGE_SYSTEM),
542 Message::from_legacy(Role::User, user_prompt),
543 ];
544
545 let response = match timeout(extraction_timeout, provider.chat(&llm_messages)).await {
546 Ok(Ok(text)) => text,
547 Ok(Err(e)) => {
548 tracing::warn!(error = %e, "reasoning: self-judge LLM call failed");
549 return None;
550 }
551 Err(_) => {
552 tracing::warn!("reasoning: self-judge timed out");
553 return None;
554 }
555 };
556
557 parse_self_judge_response(&response)
558}
559
560#[tracing::instrument(name = "memory.reasoning.distill", skip(provider, reasoning_chain))]
580pub async fn distill_strategy(
581 provider: &AnyProvider,
582 outcome: Outcome,
583 reasoning_chain: &str,
584 distill_timeout: Duration,
585) -> Option<String> {
586 if reasoning_chain.is_empty() {
587 return None;
588 }
589
590 let user_prompt = format!(
591 "Outcome: {}\n\nReasoning chain:\n{reasoning_chain}",
592 outcome.as_str()
593 );
594
595 let llm_messages = [
596 Message::from_legacy(Role::System, DISTILL_SYSTEM),
597 Message::from_legacy(Role::User, user_prompt),
598 ];
599
600 let response = match timeout(distill_timeout, provider.chat(&llm_messages)).await {
601 Ok(Ok(text)) => text,
602 Ok(Err(e)) => {
603 tracing::warn!(error = %e, "reasoning: distillation LLM call failed");
604 return None;
605 }
606 Err(_) => {
607 tracing::warn!("reasoning: distillation timed out");
608 return None;
609 }
610 };
611
612 let trimmed = trim_to_three_sentences(&response);
613 if trimmed.is_empty() {
614 None
615 } else {
616 Some(trimmed)
617 }
618}
619
620#[derive(Debug, Clone, Copy)]
624pub struct ProcessTurnConfig {
625 pub store_limit: usize,
627 pub extraction_timeout: Duration,
629 pub distill_timeout: Duration,
631 pub embed_timeout: Duration,
633 pub self_judge_window: usize,
637 pub min_assistant_chars: usize,
640}
641
642#[tracing::instrument(name = "memory.reasoning.process_turn", skip_all)]
653pub async fn process_turn(
654 memory: &ReasoningMemory,
655 extract_provider: &AnyProvider,
656 distill_provider: &AnyProvider,
657 embed_provider: &AnyProvider,
658 messages: &[Message],
659 cfg: ProcessTurnConfig,
660) -> Result<(), MemoryError> {
661 let ProcessTurnConfig {
662 store_limit,
663 extraction_timeout,
664 distill_timeout,
665 embed_timeout,
666 self_judge_window,
667 min_assistant_chars,
668 } = cfg;
669
670 let judge_messages = if messages.len() > self_judge_window {
673 &messages[messages.len() - self_judge_window..]
674 } else {
675 messages
676 };
677
678 let last_assistant_chars = judge_messages
680 .iter()
681 .rev()
682 .find(|m| m.role == Role::Assistant)
683 .map_or(0, |m| m.content.len());
684 if last_assistant_chars < min_assistant_chars {
685 return Ok(());
686 }
687
688 let Some(outcome) = run_self_judge(extract_provider, judge_messages, extraction_timeout).await
689 else {
690 return Ok(());
691 };
692
693 let outcome_enum = if outcome.success {
694 Outcome::Success
695 } else {
696 Outcome::Failure
697 };
698
699 let Some(summary) = distill_strategy(
700 distill_provider,
701 outcome_enum,
702 &outcome.reasoning_chain,
703 distill_timeout,
704 )
705 .await
706 else {
707 return Ok(());
708 };
709
710 let embed_input = format!("{}\n{}", outcome.task_hint, summary);
712 let embedding =
713 match tokio::time::timeout(embed_timeout, embed_provider.embed(&embed_input)).await {
714 Ok(Ok(v)) => v,
715 Ok(Err(e)) => {
716 tracing::warn!(error = %e, "reasoning: embedding failed — strategy not stored");
717 return Ok(());
718 }
719 Err(_) => {
720 tracing::warn!("reasoning: embed timed out — strategy not stored");
721 return Ok(());
722 }
723 };
724
725 let id = uuid::Uuid::new_v4().to_string();
726 let strategy = ReasoningStrategy {
727 id,
728 summary,
729 outcome: outcome_enum,
730 task_hint: outcome.task_hint,
731 created_at: 0, last_used_at: 0,
733 use_count: 0,
734 embedded_at: None,
735 };
736
737 let count_before = memory.count().await.unwrap_or(0);
742
743 if let Err(e) = memory.insert(&strategy, embedding).await {
744 tracing::warn!(error = %e, "reasoning: insert failed");
745 return Ok(());
746 }
747
748 if count_before >= store_limit
749 && let Err(e) = memory.evict_lru(store_limit).await
750 {
751 tracing::warn!(error = %e, "reasoning: evict_lru failed");
752 }
753
754 Ok(())
755}
756
757const MAX_TRANSCRIPT_MESSAGE_CHARS: usize = 2000;
764
765fn build_transcript_prompt(messages: &[Message]) -> String {
771 let mut prompt = String::from("Agent turn messages:\n");
772 for (i, msg) in messages.iter().enumerate() {
773 use std::fmt::Write as _;
774 let role = format!("{:?}", msg.role);
775 let content: std::borrow::Cow<str> =
777 if msg.content.chars().count() > MAX_TRANSCRIPT_MESSAGE_CHARS {
778 msg.content
779 .char_indices()
780 .nth(MAX_TRANSCRIPT_MESSAGE_CHARS)
781 .map_or(msg.content.as_str().into(), |(byte_idx, _)| {
782 msg.content[..byte_idx].into()
783 })
784 } else {
785 msg.content.as_str().into()
786 };
787 let _ = writeln!(prompt, "[{}] {}: {}", i + 1, role, content);
788 }
789 prompt.push_str("\nEvaluate this turn and return JSON.");
790 prompt
791}
792
793fn parse_self_judge_response(response: &str) -> Option<SelfJudgeOutcome> {
798 let stripped = response
800 .trim()
801 .trim_start_matches("```json")
802 .trim_start_matches("```")
803 .trim_end_matches("```")
804 .trim();
805
806 if let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(stripped) {
807 return Some(v);
808 }
809
810 if let (Some(start), Some(end)) = (stripped.find('{'), stripped.rfind('}'))
812 && end > start
813 && let Ok(v) = serde_json::from_str::<SelfJudgeOutcome>(&stripped[start..=end])
814 {
815 return Some(v);
816 }
817
818 tracing::warn!(
819 "reasoning: failed to parse self-judge response (len={}): {:.200}",
820 response.len(),
821 response
822 );
823 None
824}
825
826fn trim_to_three_sentences(text: &str) -> String {
831 const MAX_CHARS: usize = 512;
832 const MAX_SENTENCES: usize = 3;
833
834 let text = text.trim();
835 let mut sentence_ends: Vec<usize> = Vec::new();
836 let chars: Vec<char> = text.chars().collect();
837 let len = chars.len();
838
839 for (i, &ch) in chars.iter().enumerate() {
840 if matches!(ch, '.' | '!' | '?') {
841 let next_is_boundary = i + 1 >= len || chars[i + 1].is_whitespace();
842 if next_is_boundary {
843 sentence_ends.push(i + 1); if sentence_ends.len() >= MAX_SENTENCES {
845 break;
846 }
847 }
848 }
849 }
850
851 let char_limit = if let Some(&end) = sentence_ends.last() {
852 end.min(MAX_CHARS)
853 } else {
854 text.chars().count().min(MAX_CHARS)
855 };
856
857 let result: String = text.chars().take(char_limit).collect();
858 match result.char_indices().nth(MAX_CHARS) {
860 Some((byte_idx, _)) => result[..byte_idx].to_owned(),
861 None => result,
862 }
863}
864
865#[cfg(test)]
866mod tests {
867 use super::*;
868
869 #[test]
872 fn outcome_as_str_round_trip() {
873 assert_eq!(Outcome::Success.as_str(), "success");
874 assert_eq!(Outcome::Failure.as_str(), "failure");
875 }
876
877 #[test]
878 fn outcome_from_str_success() {
879 assert_eq!(Outcome::from_str("success").unwrap(), Outcome::Success);
880 }
881
882 #[test]
883 fn outcome_from_str_failure() {
884 assert_eq!(Outcome::from_str("failure").unwrap(), Outcome::Failure);
885 }
886
887 #[test]
888 fn outcome_from_str_unknown_defaults_to_failure() {
889 assert_eq!(Outcome::from_str("partial").unwrap(), Outcome::Failure);
891 }
892
893 #[test]
896 fn parse_direct_json() {
897 let json = r#"{"success":true,"reasoning_chain":"tried X","task_hint":"do Y"}"#;
898 let outcome = parse_self_judge_response(json).unwrap();
899 assert!(outcome.success);
900 assert_eq!(outcome.reasoning_chain, "tried X");
901 assert_eq!(outcome.task_hint, "do Y");
902 }
903
904 #[test]
905 fn parse_json_with_markdown_fences() {
906 let response =
907 "```json\n{\"success\":false,\"reasoning_chain\":\"r\",\"task_hint\":\"t\"}\n```";
908 let outcome = parse_self_judge_response(response).unwrap();
909 assert!(!outcome.success);
910 }
911
912 #[test]
913 fn parse_json_embedded_in_prose() {
914 let response = r#"Here is the evaluation: {"success":true,"reasoning_chain":"chain","task_hint":"hint"} — done."#;
915 let outcome = parse_self_judge_response(response).unwrap();
916 assert!(outcome.success);
917 }
918
919 #[test]
920 fn parse_invalid_returns_none() {
921 let outcome = parse_self_judge_response("not json at all");
922 assert!(outcome.is_none());
923 }
924
925 #[test]
928 fn trim_three_sentences_short_text() {
929 let text = "One. Two. Three.";
930 assert_eq!(trim_to_three_sentences(text), "One. Two. Three.");
931 }
932
933 #[test]
934 fn trim_three_sentences_truncates_at_third() {
935 let text = "One. Two. Three. Four. Five.";
936 let result = trim_to_three_sentences(text);
937 assert!(result.ends_with("Three."), "got: {result}");
938 assert!(!result.contains("Four"));
939 }
940
941 #[test]
942 fn trim_three_sentences_hard_cap() {
943 let long: String = "x".repeat(600);
945 let result = trim_to_three_sentences(&long);
946 assert!(result.chars().count() <= 512);
947 }
948
949 #[test]
950 fn trim_three_sentences_empty() {
951 assert_eq!(trim_to_three_sentences(" "), "");
952 }
953
954 async fn make_test_pool() -> DbPool {
957 let pool = sqlx::SqlitePool::connect(":memory:").await.unwrap();
958 sqlx::query(
959 "CREATE TABLE reasoning_strategies (
960 id TEXT PRIMARY KEY NOT NULL,
961 summary TEXT NOT NULL,
962 outcome TEXT NOT NULL,
963 task_hint TEXT NOT NULL,
964 created_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
965 last_used_at INTEGER NOT NULL DEFAULT (unixepoch('now')),
966 use_count INTEGER NOT NULL DEFAULT 0,
967 embedded_at INTEGER
968 )",
969 )
970 .execute(&pool)
971 .await
972 .unwrap();
973 pool
974 }
975
976 fn make_strategy(id: &str) -> ReasoningStrategy {
977 ReasoningStrategy {
978 id: id.to_owned(),
979 summary: format!("Summary for {id}"),
980 outcome: Outcome::Success,
981 task_hint: format!("Task hint for {id}"),
982 created_at: 0,
983 last_used_at: 0,
984 use_count: 0,
985 embedded_at: None,
986 }
987 }
988
989 #[tokio::test]
990 async fn insert_and_fetch_by_ids() {
991 let pool = make_test_pool().await;
992 let mem = ReasoningMemory::new(pool, None);
993
994 let s = make_strategy("abc-123");
995 mem.insert(&s, vec![]).await.unwrap();
996
997 let rows = mem.fetch_by_ids(&["abc-123".to_owned()]).await.unwrap();
998 assert_eq!(rows.len(), 1);
999 assert_eq!(rows[0].id, "abc-123");
1000 assert_eq!(rows[0].outcome, Outcome::Success);
1001 }
1002
1003 #[tokio::test]
1004 async fn mark_used_increments_count() {
1005 let pool = make_test_pool().await;
1006 let mem = ReasoningMemory::new(pool, None);
1007
1008 let s = make_strategy("mark-1");
1009 mem.insert(&s, vec![]).await.unwrap();
1010 mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
1011 mem.mark_used(&["mark-1".to_owned()]).await.unwrap();
1012
1013 let rows = mem.fetch_by_ids(&["mark-1".to_owned()]).await.unwrap();
1014 assert_eq!(rows[0].use_count, 2);
1015 }
1016
1017 #[tokio::test]
1018 async fn mark_used_empty_is_noop() {
1019 let pool = make_test_pool().await;
1020 let mem = ReasoningMemory::new(pool, None);
1021 mem.mark_used(&[]).await.unwrap();
1023 }
1024
1025 #[tokio::test]
1026 async fn count_returns_correct_total() {
1027 let pool = make_test_pool().await;
1028 let mem = ReasoningMemory::new(pool, None);
1029
1030 for i in 0..5 {
1031 mem.insert(&make_strategy(&format!("s{i}")), vec![])
1032 .await
1033 .unwrap();
1034 }
1035
1036 assert_eq!(mem.count().await.unwrap(), 5);
1037 }
1038
1039 #[tokio::test]
1040 async fn evict_lru_cold_rows() {
1041 let pool = make_test_pool().await;
1042 let mem = ReasoningMemory::new(pool, None);
1043
1044 for i in 0..5 {
1046 mem.insert(&make_strategy(&format!("cold-{i}")), vec![])
1047 .await
1048 .unwrap();
1049 }
1050
1051 let deleted = mem.evict_lru(3).await.unwrap();
1053 assert_eq!(deleted, 2);
1054 assert_eq!(mem.count().await.unwrap(), 3);
1055 }
1056
1057 #[tokio::test]
1058 async fn evict_lru_respects_hot_rows_under_ceiling() {
1059 let pool = make_test_pool().await;
1060 let mem = ReasoningMemory::new(pool.clone(), None);
1061
1062 for i in 0..5 {
1064 let id = format!("hot-{i}");
1065 mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1066 let ids: Vec<String> = (0..11).map(|_| id.clone()).collect();
1068 for chunk_ids in ids.chunks(1) {
1069 mem.mark_used(chunk_ids).await.unwrap();
1070 }
1071 }
1072
1073 let deleted = mem.evict_lru(3).await.unwrap();
1075 assert_eq!(deleted, 0);
1076 assert_eq!(mem.count().await.unwrap(), 5);
1077 }
1078
1079 #[tokio::test]
1080 async fn evict_lru_hard_ceiling_forces_deletion() {
1081 let pool = make_test_pool().await;
1082 let mem = ReasoningMemory::new(pool.clone(), None);
1083
1084 for i in 0..7 {
1086 let id = format!("hot2-{i}");
1087 mem.insert(&make_strategy(&id), vec![]).await.unwrap();
1088 for _ in 0..=HOT_STRATEGY_USE_COUNT {
1090 mem.mark_used(std::slice::from_ref(&id)).await.unwrap();
1091 }
1092 }
1093
1094 let deleted = mem.evict_lru(3).await.unwrap();
1095 assert!(deleted > 0, "expected forced deletion");
1096 let remaining = mem.count().await.unwrap();
1097 assert_eq!(remaining, 3, "should be trimmed to store_limit");
1098 }
1099
1100 #[tokio::test]
1101 async fn evict_lru_no_op_when_under_limit() {
1102 let pool = make_test_pool().await;
1103 let mem = ReasoningMemory::new(pool, None);
1104
1105 for i in 0..3 {
1106 mem.insert(&make_strategy(&format!("s{i}")), vec![])
1107 .await
1108 .unwrap();
1109 }
1110
1111 let deleted = mem.evict_lru(10).await.unwrap();
1113 assert_eq!(deleted, 0);
1114 }
1115
1116 #[tokio::test]
1119 async fn mark_used_chunked_over_490_ids() {
1120 let pool = make_test_pool().await;
1121 let mem = ReasoningMemory::new(pool, None);
1122
1123 for i in 0..500usize {
1125 mem.insert(&make_strategy(&format!("chunked-{i}")), vec![])
1126 .await
1127 .unwrap();
1128 }
1129
1130 let ids: Vec<String> = (0..500usize).map(|i| format!("chunked-{i}")).collect();
1131 mem.mark_used(&ids).await.unwrap();
1132
1133 let first = mem.fetch_by_ids(&[ids[0].clone()]).await.unwrap();
1135 let over_chunk = mem.fetch_by_ids(&[ids[490].clone()]).await.unwrap();
1136 assert_eq!(first[0].use_count, 1, "first id should have use_count = 1");
1137 assert_eq!(
1138 over_chunk[0].use_count, 1,
1139 "id past the chunk boundary should have use_count = 1"
1140 );
1141 }
1142
1143 #[tokio::test]
1146 async fn run_self_judge_malformed_json_returns_none() {
1147 use zeph_llm::any::AnyProvider;
1148 use zeph_llm::mock::MockProvider;
1149
1150 let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1152 "This is not JSON at all.".to_string(),
1153 ]));
1154 let msgs = vec![Message::from_legacy(Role::User, "hello")];
1155 let result = run_self_judge(&provider, &msgs, std::time::Duration::from_secs(5)).await;
1156 assert!(result.is_none(), "malformed LLM response must return None");
1157 }
1158
1159 #[tokio::test]
1162 async fn distill_strategy_truncates_to_three_sentences() {
1163 use zeph_llm::any::AnyProvider;
1164 use zeph_llm::mock::MockProvider;
1165
1166 let long_response = "One. Two. Three. Four. Five.";
1167 let provider = AnyProvider::Mock(MockProvider::with_responses(vec![
1168 long_response.to_string(),
1169 ]));
1170 let result = distill_strategy(
1171 &provider,
1172 Outcome::Success,
1173 "chain here",
1174 std::time::Duration::from_secs(5),
1175 )
1176 .await
1177 .unwrap();
1178 assert!(result.ends_with("Three."), "got: {result}");
1179 assert!(
1180 !result.contains("Four"),
1181 "should not contain 4th sentence: {result}"
1182 );
1183 }
1184
1185 #[tokio::test]
1188 async fn process_turn_with_empty_messages_is_noop() {
1189 use zeph_llm::any::AnyProvider;
1190 use zeph_llm::mock::MockProvider;
1191
1192 let pool = make_test_pool().await;
1193 let mem = ReasoningMemory::new(pool, None);
1194 let provider = AnyProvider::Mock(MockProvider::default());
1197 let cfg = ProcessTurnConfig {
1198 store_limit: 100,
1199 extraction_timeout: std::time::Duration::from_secs(1),
1200 distill_timeout: std::time::Duration::from_secs(1),
1201 embed_timeout: std::time::Duration::from_secs(5),
1202 self_judge_window: 2,
1203 min_assistant_chars: 0,
1204 };
1205 let result = process_turn(&mem, &provider, &provider, &provider, &[], cfg).await;
1206 assert!(
1207 result.is_ok(),
1208 "process_turn with empty messages must succeed"
1209 );
1210 assert_eq!(
1211 mem.count().await.unwrap(),
1212 0,
1213 "no strategies should be stored"
1214 );
1215 }
1216}