1use std::time::Duration;
11
12use blake3;
13use serde::{Deserialize, Serialize};
14use zeph_config::PlanCacheConfig;
15use zeph_db::DbPool;
16#[cfg(any(feature = "sqlite", feature = "postgres"))]
17use zeph_db::sql;
18use zeph_llm::provider::{LlmProvider, Message, Role};
19
20use super::dag;
21use super::error::OrchestrationError;
22use super::graph::{FailureStrategy, PlanSlug, TaskGraph};
23use super::planner::{PlannerResponse, convert_response_pub};
24use zeph_subagent::SubAgentDef;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TemplateTask {
31 pub title: String,
33 pub description: String,
35 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub agent_hint: Option<String>,
38 #[serde(default, skip_serializing_if = "Vec::is_empty")]
40 pub depends_on: Vec<PlanSlug>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
43 pub failure_strategy: Option<FailureStrategy>,
44 pub task_id: PlanSlug,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PlanTemplate {
67 pub goal: String,
69 pub tasks: Vec<TemplateTask>,
71}
72
73impl PlanTemplate {
74 pub fn from_task_graph(graph: &TaskGraph) -> Result<Self, OrchestrationError> {
80 if graph.tasks.is_empty() {
81 return Err(OrchestrationError::PlanningFailed(
82 "cannot cache a plan with zero tasks".into(),
83 ));
84 }
85
86 let id_to_slug: Vec<PlanSlug> = graph
88 .tasks
89 .iter()
90 .map(|n| PlanSlug::from(slugify_title(&n.title, n.id.as_u32())))
91 .collect();
92
93 let tasks = graph
94 .tasks
95 .iter()
96 .enumerate()
97 .map(|(i, node)| TemplateTask {
98 title: node.title.clone(),
99 description: node.description.clone(),
100 agent_hint: node.agent_hint.clone(),
101 depends_on: node
102 .depends_on
103 .iter()
104 .map(|dep| id_to_slug[dep.index()].clone())
105 .collect(),
106 failure_strategy: node.failure_strategy,
107 task_id: id_to_slug[i].clone(),
108 })
109 .collect();
110
111 Ok(Self {
112 goal: normalize_goal(&graph.goal),
113 tasks,
114 })
115 }
116}
117
118#[must_use]
124pub fn normalize_goal(text: &str) -> String {
125 let trimmed = text.trim();
126 let mut result = String::with_capacity(trimmed.len());
127 let mut prev_space = false;
128 for ch in trimmed.chars() {
129 if ch.is_whitespace() {
130 if !prev_space && !result.is_empty() {
131 result.push(' ');
132 prev_space = true;
133 }
134 } else {
135 for lc in ch.to_lowercase() {
136 result.push(lc);
137 }
138 prev_space = false;
139 }
140 }
141 result
142}
143
144#[must_use]
146pub fn goal_hash(normalized: &str) -> String {
147 blake3::hash(normalized.as_bytes()).to_hex().to_string()
148}
149
150fn slugify_title(title: &str, idx: u32) -> String {
152 let slug: String = title
153 .chars()
154 .map(|c| {
155 if c.is_ascii_alphanumeric() {
156 c.to_ascii_lowercase()
157 } else {
158 '-'
159 }
160 })
161 .collect::<String>()
162 .split('-')
163 .filter(|s| !s.is_empty())
164 .collect::<Vec<_>>()
165 .join("-");
166
167 let capped = if slug.len() > 32 { &slug[..32] } else { &slug };
169 let capped = capped.trim_end_matches('-');
171 if capped.is_empty() {
172 format!("task-{idx}")
173 } else {
174 format!("{capped}-{idx}")
175 }
176}
177
178fn embedding_to_blob(embedding: &[f32]) -> Vec<u8> {
180 embedding.iter().flat_map(|f| f.to_le_bytes()).collect()
181}
182
183fn blob_to_embedding(blob: &[u8]) -> Option<Vec<f32>> {
188 if !blob.len().is_multiple_of(4) {
189 tracing::warn!(
190 len = blob.len(),
191 "plan cache: embedding blob length not a multiple of 4"
192 );
193 return None;
194 }
195 Some(
196 blob.chunks_exact(4)
197 .map(|chunk| f32::from_le_bytes(chunk.try_into().expect("chunk is exactly 4 bytes")))
198 .collect(),
199 )
200}
201
202fn unix_now() -> i64 {
203 #[allow(clippy::cast_possible_wrap)]
204 {
205 std::time::SystemTime::now()
206 .duration_since(std::time::UNIX_EPOCH)
207 .unwrap_or_default()
208 .as_secs() as i64
209 }
210}
211
212#[non_exhaustive]
213#[derive(Debug, thiserror::Error)]
218pub enum PlanCacheError {
219 #[error("sqlx error: {0}")]
221 Database(#[from] zeph_db::SqlxError),
222 #[error("serialization error: {0}")]
224 Serialization(#[from] serde_json::Error),
225 #[error("plan template extraction failed: {0}")]
227 Extraction(String),
228}
229
230pub struct PlanCache {
236 pool: DbPool,
237 config: PlanCacheConfig,
238}
239
240impl PlanCache {
241 #[tracing::instrument(
247 name = "orchestration.plan_cache.new",
248 skip_all,
249 fields(current_embedding_model = current_embedding_model)
250 )]
251 pub async fn new(
252 pool: DbPool,
253 config: PlanCacheConfig,
254 current_embedding_model: &str,
255 ) -> Result<Self, PlanCacheError> {
256 let cache = Self { pool, config };
257 cache
258 .invalidate_stale_embeddings(current_embedding_model)
259 .await?;
260 Ok(cache)
261 }
262
263 async fn invalidate_stale_embeddings(&self, current_model: &str) -> Result<(), PlanCacheError> {
269 let affected = zeph_db::query(sql!(
270 "UPDATE plan_cache SET embedding = NULL, embedding_model = NULL \
271 WHERE embedding IS NOT NULL AND embedding_model != ?"
272 ))
273 .bind(current_model)
274 .execute(&self.pool)
275 .await?
276 .rows_affected();
277
278 if affected > 0 {
279 tracing::info!(
280 rows = affected,
281 current_model,
282 "plan cache: invalidated stale embeddings for model change"
283 );
284 }
285 Ok(())
286 }
287
288 #[tracing::instrument(name = "orchestration.plan_cache.find_similar", skip_all, fields(embedding_model = embedding_model))]
300 pub async fn find_similar(
301 &self,
302 goal_embedding: &[f32],
303 embedding_model: &str,
304 ) -> Result<Option<(PlanTemplate, f32)>, PlanCacheError> {
305 let rows: Vec<(String, String, Vec<u8>)> = zeph_db::query_as(sql!(
306 "SELECT id, template, embedding FROM plan_cache \
307 WHERE embedding IS NOT NULL AND embedding_model = ? \
308 ORDER BY last_accessed_at DESC LIMIT ?"
309 ))
310 .bind(embedding_model)
311 .bind(i64::from(self.config.max_templates))
312 .fetch_all(&self.pool)
313 .await?;
314
315 let mut best_score = -1.0_f32;
316 let mut best_id: Option<String> = None;
317 let mut best_template_json: Option<String> = None;
318
319 for (id, template_json, blob) in rows {
320 if let Some(stored) = blob_to_embedding(&blob) {
321 let score = zeph_common::math::cosine_similarity(goal_embedding, &stored);
322 if score > best_score {
323 best_score = score;
324 best_id = Some(id);
325 best_template_json = Some(template_json);
326 }
327 }
328 }
329
330 if best_score >= self.config.similarity_threshold
331 && let (Some(id), Some(json)) = (best_id, best_template_json)
332 {
333 let now = unix_now();
335 if let Err(e) = zeph_db::query(sql!(
336 "UPDATE plan_cache SET last_accessed_at = ?, adapted_count = adapted_count + 1 \
337 WHERE id = ?"
338 ))
339 .bind(now)
340 .bind(&id)
341 .execute(&self.pool)
342 .await
343 {
344 tracing::warn!(error = %e, "plan cache: failed to update last_accessed_at");
345 }
346 let template: PlanTemplate = serde_json::from_str(&json)?;
347 return Ok(Some((template, best_score)));
348 }
349
350 Ok(None)
351 }
352
353 #[tracing::instrument(name = "orchestration.plan_cache.cache_plan", skip_all, fields(goal_hash = tracing::field::Empty))]
363 pub async fn cache_plan(
364 &self,
365 graph: &TaskGraph,
366 goal_embedding: &[f32],
367 embedding_model: &str,
368 ) -> Result<(), PlanCacheError> {
369 let template = PlanTemplate::from_task_graph(graph)
370 .map_err(|e| PlanCacheError::Extraction(e.to_string()))?;
371
372 let normalized = normalize_goal(&graph.goal);
373 let hash = goal_hash(&normalized);
374 tracing::Span::current().record("goal_hash", hash.as_str());
375 let template_json = serde_json::to_string(&template)?;
376 let task_count = i64::try_from(template.tasks.len()).unwrap_or(i64::MAX);
377 let now = unix_now();
378 let id = uuid::Uuid::new_v4().to_string();
379 let blob = embedding_to_blob(goal_embedding);
380
381 zeph_db::query(sql!(
382 "INSERT INTO plan_cache \
383 (id, goal_hash, goal_text, template, task_count, success_count, adapted_count, \
384 embedding, embedding_model, created_at, last_accessed_at) \
385 VALUES (?, ?, ?, ?, ?, 1, 0, ?, ?, ?, ?) \
386 ON CONFLICT(goal_hash) DO UPDATE SET \
387 success_count = success_count + 1, \
388 template = excluded.template, \
389 task_count = excluded.task_count, \
390 embedding = excluded.embedding, \
391 embedding_model = excluded.embedding_model, \
392 last_accessed_at = excluded.last_accessed_at"
393 ))
394 .bind(&id)
395 .bind(&hash)
396 .bind(&normalized)
397 .bind(&template_json)
398 .bind(task_count)
399 .bind(&blob)
400 .bind(embedding_model)
401 .bind(now)
402 .bind(now)
403 .execute(&self.pool)
404 .await?;
405
406 if let Err(e) = self.evict().await {
408 tracing::warn!(error = %e, "plan cache: eviction failed after cache_plan");
409 }
410
411 Ok(())
412 }
413
414 #[tracing::instrument(name = "orchestration.plan_cache.evict", skip_all)]
425 pub async fn evict(&self) -> Result<u32, PlanCacheError> {
426 let now = unix_now();
427 let ttl_secs = i64::from(self.config.ttl_days) * 86_400;
428 let cutoff = now.saturating_sub(ttl_secs);
429
430 let ttl_deleted = zeph_db::query(sql!("DELETE FROM plan_cache WHERE last_accessed_at < ?"))
431 .bind(cutoff)
432 .execute(&self.pool)
433 .await?
434 .rows_affected();
435
436 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
438 .fetch_one(&self.pool)
439 .await?;
440
441 let max = i64::from(self.config.max_templates);
442 let lru_deleted = if count > max {
443 let excess = count - max;
444 zeph_db::query(sql!(
445 "DELETE FROM plan_cache WHERE id IN \
446 (SELECT id FROM plan_cache ORDER BY last_accessed_at ASC LIMIT ?)"
447 ))
448 .bind(excess)
449 .execute(&self.pool)
450 .await?
451 .rows_affected()
452 } else {
453 0
454 };
455
456 let total = ttl_deleted + lru_deleted;
457 if total > 0 {
458 tracing::debug!(ttl_deleted, lru_deleted, "plan cache: eviction complete");
459 }
460 Ok(u32::try_from(total).unwrap_or(u32::MAX))
461 }
462}
463
464#[allow(clippy::too_many_arguments)]
473#[tracing::instrument(name = "orchestration.plan_cache.plan", skip_all, fields(goal_len = goal.len()))]
474pub async fn plan_with_cache<P>(
475 planner: &P,
476 plan_cache: Option<&PlanCache>,
477 provider: &impl LlmProvider,
478 embedding: Option<&[f32]>,
479 embedding_model: &str,
480 goal: &str,
481 available_agents: &[SubAgentDef],
482 max_tasks: u32,
483 planner_timeout: Duration,
484) -> Result<(TaskGraph, Option<(u64, u64)>), OrchestrationError>
485where
486 P: super::planner::Planner,
487{
488 if let (Some(cache), Some(emb)) = (plan_cache, embedding)
489 && cache.config.enabled
490 {
491 match cache.find_similar(emb, embedding_model).await {
492 Ok(Some((template, score))) => {
493 tracing::info!(
494 similarity = score,
495 tasks = template.tasks.len(),
496 "plan cache hit, adapting template"
497 );
498 match adapt_plan(
499 provider,
500 goal,
501 &template,
502 available_agents,
503 max_tasks,
504 planner_timeout,
505 )
506 .await
507 {
508 Ok(result) => return Ok(result),
509 Err(e) => {
510 tracing::warn!(
511 error = %e,
512 "plan cache: adaptation failed, falling back to full decomposition"
513 );
514 }
515 }
516 }
517 Ok(None) => {
518 tracing::debug!("plan cache miss");
519 }
520 Err(e) => {
521 tracing::warn!(error = %e, "plan cache: find_similar failed, using full decomposition");
522 }
523 }
524 }
525
526 planner.plan(goal, available_agents).await
527}
528
529#[tracing::instrument(name = "orchestration.plan_cache.adapt", skip_all, fields(goal_len = goal.len()))]
540async fn adapt_plan(
541 provider: &impl LlmProvider,
542 goal: &str,
543 template: &PlanTemplate,
544 available_agents: &[SubAgentDef],
545 max_tasks: u32,
546 timeout: Duration,
547) -> Result<(TaskGraph, Option<(u64, u64)>), OrchestrationError> {
548 use zeph_subagent::ToolPolicy;
549
550 let agent_catalog = available_agents
551 .iter()
552 .map(|a| {
553 let tools = match &a.tools {
554 ToolPolicy::AllowList(list) => list.join(", "),
555 ToolPolicy::DenyList(excluded) => {
556 format!("all except: [{}]", excluded.join(", "))
557 }
558 _ => "all".to_string(),
559 };
560 format!(
561 "- name: \"{}\", description: \"{}\", tools: [{}]",
562 a.name, a.description, tools
563 )
564 })
565 .collect::<Vec<_>>()
566 .join("\n");
567
568 let template_json = serde_json::to_string(&template.tasks)
569 .map_err(|e| OrchestrationError::PlanningFailed(e.to_string()))?;
570
571 let system = format!(
572 "You are a task planner. A cached plan template exists for a similar goal. \
573 Adapt it for the new goal by adjusting task descriptions and adding or removing \
574 tasks as needed. Keep the same JSON structure.\n\n\
575 Available agents:\n{agent_catalog}\n\n\
576 Rules:\n\
577 - Each task must have a unique task_id (short, descriptive, kebab-case: [a-z0-9-]).\n\
578 - Specify dependencies using task_id strings in depends_on.\n\
579 - Do not create more than {max_tasks} tasks.\n\
580 - failure_strategy is optional: \"abort\", \"retry\", \"skip\", \"ask\"."
581 );
582
583 let user = format!(
584 "New goal:\n{goal}\n\nCached template (for similar goal \"{}\"):\n{template_json}\n\n\
585 Adapt the template for the new goal. Return JSON: {{\"tasks\": [...]}}",
586 template.goal
587 );
588
589 let messages = vec![
590 Message::from_legacy(Role::System, system),
591 Message::from_legacy(Role::User, user),
592 ];
593
594 let response: PlannerResponse = tokio::time::timeout(timeout, provider.chat_typed(&messages))
595 .await
596 .map_err(|_| OrchestrationError::PlanningFailed("plan cache adaptation timed out".into()))?
597 .map_err(|e| OrchestrationError::PlanningFailed(e.to_string()))?;
598
599 let usage = provider.last_usage();
600
601 let graph = convert_response_pub(response, goal, available_agents, max_tasks)?;
602
603 dag::validate(&graph.tasks, max_tasks as usize)?;
604
605 Ok((graph, usage))
606}
607
608#[cfg(test)]
609mod tests {
610 use super::super::graph::{TaskId, TaskNode};
611 use super::*;
612 use zeph_memory::store::SqliteStore;
613
614 async fn test_pool() -> DbPool {
615 let store = SqliteStore::new(":memory:").await.unwrap();
616 store.pool().clone()
617 }
618
619 async fn test_cache(pool: DbPool) -> PlanCache {
620 PlanCache::new(pool, PlanCacheConfig::default(), "test-model")
621 .await
622 .unwrap()
623 }
624
625 fn make_graph(goal: &str, tasks: &[(&str, &str, &[u32])]) -> TaskGraph {
626 let mut graph = TaskGraph::new(goal);
627 for (i, (title, desc, deps)) in tasks.iter().enumerate() {
628 #[allow(clippy::cast_possible_truncation)]
629 let mut node = TaskNode::new(i as u32, *title, *desc);
630 node.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
631 graph.tasks.push(node);
632 }
633 graph
634 }
635
636 #[test]
639 fn normalize_trims_and_lowercases() {
640 assert_eq!(normalize_goal(" Hello World "), "hello world");
641 }
642
643 #[test]
644 fn normalize_collapses_internal_whitespace() {
645 assert_eq!(normalize_goal("hello world"), "hello world");
646 }
647
648 #[test]
649 fn normalize_empty_string() {
650 assert_eq!(normalize_goal(""), "");
651 }
652
653 #[test]
654 fn normalize_whitespace_only() {
655 assert_eq!(normalize_goal(" "), "");
656 }
657
658 #[test]
661 fn goal_hash_is_deterministic() {
662 let h1 = goal_hash("deploy service");
663 let h2 = goal_hash("deploy service");
664 assert_eq!(h1, h2);
665 }
666
667 #[test]
668 fn goal_hash_differs_for_different_goals() {
669 assert_ne!(goal_hash("deploy service"), goal_hash("build artifact"));
670 }
671
672 #[test]
673 fn goal_hash_nonempty() {
674 assert!(!goal_hash("goal").is_empty());
675 }
676
677 #[test]
680 fn template_from_empty_graph_returns_error() {
681 let graph = TaskGraph::new("goal");
682 assert!(PlanTemplate::from_task_graph(&graph).is_err());
683 }
684
685 #[test]
686 fn template_strips_runtime_fields() {
687 use crate::graph::TaskStatus;
688 let mut graph = make_graph("goal", &[("Fetch data", "Download it", &[])]);
689 graph.tasks[0].status = TaskStatus::Completed;
690 graph.tasks[0].retry_count = 3;
691 graph.tasks[0].assigned_agent = Some("agent-x".to_string());
692 let template = PlanTemplate::from_task_graph(&graph).unwrap();
693 assert_eq!(template.tasks[0].title, "Fetch data");
695 assert_eq!(template.tasks[0].description, "Download it");
696 }
697
698 #[test]
699 fn template_preserves_dependencies() {
700 let graph = make_graph("goal", &[("Task A", "do A", &[]), ("Task B", "do B", &[0])]);
701 let template = PlanTemplate::from_task_graph(&graph).unwrap();
702 assert_eq!(template.tasks.len(), 2);
703 assert!(template.tasks[0].depends_on.is_empty());
704 assert_eq!(template.tasks[1].depends_on.len(), 1);
705 assert_eq!(template.tasks[1].depends_on[0], template.tasks[0].task_id);
706 }
707
708 #[test]
709 fn template_serde_roundtrip() {
710 let graph = make_graph("goal", &[("Step one", "do step one", &[])]);
711 let template = PlanTemplate::from_task_graph(&graph).unwrap();
712 let json = serde_json::to_string(&template).unwrap();
713 let restored: PlanTemplate = serde_json::from_str(&json).unwrap();
714 assert_eq!(template.tasks[0].title, restored.tasks[0].title);
715 assert_eq!(template.goal, restored.goal);
716 }
717
718 #[test]
721 fn embedding_blob_roundtrip() {
722 let embedding = vec![1.0_f32, 0.5, 0.25, -1.0];
723 let blob = embedding_to_blob(&embedding);
724 let restored = blob_to_embedding(&blob).unwrap();
725 assert_eq!(embedding, restored);
726 }
727
728 #[test]
729 fn blob_to_embedding_odd_length_returns_none() {
730 let bad_blob = vec![0u8; 5]; assert!(blob_to_embedding(&bad_blob).is_none());
732 }
733
734 #[tokio::test]
737 async fn cache_miss_on_empty_cache() {
738 let pool = test_pool().await;
739 let cache = test_cache(pool).await;
740 let result = cache
741 .find_similar(&[1.0, 0.0, 0.0], "test-model")
742 .await
743 .unwrap();
744 assert!(result.is_none());
745 }
746
747 #[tokio::test]
748 async fn cache_store_and_hit() {
749 let pool = test_pool().await;
750 let config = PlanCacheConfig {
751 similarity_threshold: 0.9,
752 ..PlanCacheConfig::default()
753 };
754 let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
755
756 let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
757 let embedding = vec![1.0_f32, 0.0, 0.0];
758 cache
759 .cache_plan(&graph, &embedding, "test-model")
760 .await
761 .unwrap();
762
763 let result = cache
765 .find_similar(&[1.0, 0.0, 0.0], "test-model")
766 .await
767 .unwrap();
768 assert!(result.is_some());
769 let (template, score) = result.unwrap();
770 assert!((score - 1.0).abs() < 1e-5);
771 assert_eq!(template.tasks.len(), 1);
772 }
773
774 #[tokio::test]
775 async fn cache_miss_on_dissimilar_goal() {
776 let pool = test_pool().await;
777 let config = PlanCacheConfig {
778 similarity_threshold: 0.9,
779 ..PlanCacheConfig::default()
780 };
781 let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
782
783 let graph = make_graph("goal a", &[("Task", "do it", &[])]);
784 cache
785 .cache_plan(&graph, &[1.0_f32, 0.0, 0.0], "test-model")
786 .await
787 .unwrap();
788
789 let result = cache
791 .find_similar(&[0.0, 1.0, 0.0], "test-model")
792 .await
793 .unwrap();
794 assert!(result.is_none());
795 }
796
797 #[tokio::test]
798 async fn deduplication_increments_success_count() {
799 let pool = test_pool().await;
800 let cache = test_cache(pool.clone()).await;
801
802 let graph = make_graph("same goal", &[("Task", "do it", &[])]);
803 let emb = vec![1.0_f32, 0.0];
804
805 cache.cache_plan(&graph, &emb, "test-model").await.unwrap();
806 cache.cache_plan(&graph, &emb, "test-model").await.unwrap();
807
808 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
810 .fetch_one(&pool)
811 .await
812 .unwrap();
813 assert_eq!(count, 1);
814
815 let success: i64 = zeph_db::query_scalar(sql!("SELECT success_count FROM plan_cache"))
816 .fetch_one(&pool)
817 .await
818 .unwrap();
819 assert_eq!(success, 2);
820 }
821
822 #[tokio::test]
823 async fn eviction_removes_ttl_expired_rows() {
824 let pool = test_pool().await;
825 let config = PlanCacheConfig {
827 ttl_days: 0,
828 ..PlanCacheConfig::default()
829 };
830 let cache = PlanCache::new(pool.clone(), config, "test-model")
831 .await
832 .unwrap();
833
834 let now = unix_now() - 1;
836 zeph_db::query(sql!(
837 "INSERT INTO plan_cache \
838 (id, goal_hash, goal_text, template, task_count, created_at, last_accessed_at) \
839 VALUES (?, ?, ?, ?, ?, ?, ?)"
840 ))
841 .bind("test-id")
842 .bind("hash-1")
843 .bind("goal")
844 .bind("{\"goal\":\"goal\",\"tasks\":[]}")
845 .bind(0_i64)
846 .bind(now)
847 .bind(now)
848 .execute(&pool)
849 .await
850 .unwrap();
851
852 let deleted = cache.evict().await.unwrap();
853 assert!(deleted >= 1);
854
855 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
856 .fetch_one(&pool)
857 .await
858 .unwrap();
859 assert_eq!(count, 0);
860 }
861
862 #[tokio::test]
863 async fn eviction_lru_when_over_max() {
864 let pool = test_pool().await;
865 let config = PlanCacheConfig {
866 max_templates: 2,
867 ttl_days: 365,
868 ..PlanCacheConfig::default()
869 };
870 let cache = PlanCache::new(pool.clone(), config, "test-model")
871 .await
872 .unwrap();
873
874 let now = unix_now();
875 for i in 0..3_i64 {
877 zeph_db::query(sql!(
878 "INSERT INTO plan_cache \
879 (id, goal_hash, goal_text, template, task_count, created_at, last_accessed_at) \
880 VALUES (?, ?, ?, ?, ?, ?, ?)"
881 ))
882 .bind(format!("id-{i}"))
883 .bind(format!("hash-{i}"))
884 .bind(format!("goal-{i}"))
885 .bind("{\"goal\":\"g\",\"tasks\":[]}")
886 .bind(0_i64)
887 .bind(now)
888 .bind(now + i) .execute(&pool)
890 .await
891 .unwrap();
892 }
893
894 let deleted = cache.evict().await.unwrap();
895 assert_eq!(deleted, 1);
896
897 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
899 .fetch_one(&pool)
900 .await
901 .unwrap();
902 assert_eq!(count, 2);
903 }
904
905 #[tokio::test]
906 async fn stale_embedding_invalidated_on_new() {
907 let pool = test_pool().await;
908 let now = unix_now();
909
910 let emb = embedding_to_blob(&[1.0_f32, 0.0]);
912 zeph_db::query(sql!(
913 "INSERT INTO plan_cache \
914 (id, goal_hash, goal_text, template, task_count, embedding, embedding_model, \
915 created_at, last_accessed_at) \
916 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
917 ))
918 .bind("id-old")
919 .bind("hash-old")
920 .bind("goal old")
921 .bind("{\"goal\":\"g\",\"tasks\":[]}")
922 .bind(0_i64)
923 .bind(&emb)
924 .bind("old-model")
925 .bind(now)
926 .bind(now)
927 .execute(&pool)
928 .await
929 .unwrap();
930
931 let _cache = PlanCache::new(pool.clone(), PlanCacheConfig::default(), "new-model")
933 .await
934 .unwrap();
935
936 let model: Option<String> = zeph_db::query_scalar(sql!(
937 "SELECT embedding_model FROM plan_cache WHERE id = 'id-old'"
938 ))
939 .fetch_one(&pool)
940 .await
941 .unwrap();
942 assert!(model.is_none(), "stale embedding_model should be NULL");
943
944 let emb_col: Option<Vec<u8>> =
945 zeph_db::query_scalar(sql!("SELECT embedding FROM plan_cache WHERE id = 'id-old'"))
946 .fetch_one(&pool)
947 .await
948 .unwrap();
949 assert!(emb_col.is_none(), "stale embedding should be NULL");
950 }
951
952 #[tokio::test]
953 async fn disabled_cache_not_used_in_plan_with_cache() {
954 use crate::planner::LlmPlanner;
955 use zeph_config::OrchestrationConfig;
956 use zeph_llm::mock::MockProvider;
957
958 let pool = test_pool().await;
959 let config = PlanCacheConfig::default(); let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
961
962 let graph_json = r#"{"tasks": [
963 {"task_id": "t1", "title": "Task", "description": "do it", "depends_on": []}
964 ]}"#
965 .to_string();
966
967 let provider = MockProvider::with_responses(vec![graph_json.clone()]);
968 let planner = LlmPlanner::new(
969 MockProvider::with_responses(vec![graph_json]),
970 &OrchestrationConfig::default(),
971 );
972
973 let (graph, _) = plan_with_cache(
974 &planner,
975 Some(&cache),
976 &provider,
977 Some(&[1.0_f32, 0.0]),
978 "test-model",
979 "do something",
980 &[],
981 20,
982 Duration::from_mins(2),
983 )
984 .await
985 .unwrap();
986
987 assert_eq!(graph.tasks.len(), 1);
988 }
989
990 #[tokio::test]
991 async fn plan_with_cache_with_none_embedding_skips_cache() {
992 use crate::planner::LlmPlanner;
993 use zeph_config::OrchestrationConfig;
994 use zeph_llm::mock::MockProvider;
995
996 let pool = test_pool().await;
997 let config = PlanCacheConfig {
998 enabled: true,
999 similarity_threshold: 0.5,
1000 ..PlanCacheConfig::default()
1001 };
1002 let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
1003
1004 let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
1006 cache
1007 .cache_plan(&graph, &[1.0_f32, 0.0], "test-model")
1008 .await
1009 .unwrap();
1010
1011 let graph_json = r#"{"tasks": [
1012 {"task_id": "fallback-task-0", "title": "Fallback", "description": "planner fallback", "depends_on": []}
1013 ]}"#
1014 .to_string();
1015
1016 let provider = MockProvider::with_responses(vec![graph_json.clone()]);
1017 let planner = LlmPlanner::new(
1018 MockProvider::with_responses(vec![graph_json]),
1019 &OrchestrationConfig::default(),
1020 );
1021
1022 let (result_graph, _) = plan_with_cache(
1024 &planner,
1025 Some(&cache),
1026 &provider,
1027 None, "test-model",
1029 "deploy service",
1030 &[],
1031 20,
1032 Duration::from_mins(2),
1033 )
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(result_graph.tasks[0].title, "Fallback");
1038 }
1039
1040 #[tokio::test]
1041 async fn adapt_plan_error_fallback_to_full_decomposition() {
1042 use crate::planner::LlmPlanner;
1043 use zeph_config::OrchestrationConfig;
1044 use zeph_llm::mock::MockProvider;
1045
1046 let pool = test_pool().await;
1047 let config = PlanCacheConfig {
1048 enabled: true,
1049 similarity_threshold: 0.5,
1050 ..PlanCacheConfig::default()
1051 };
1052 let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
1053
1054 let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
1056 cache
1057 .cache_plan(&graph, &[1.0_f32, 0.0], "test-model")
1058 .await
1059 .unwrap();
1060
1061 let bad_provider = MockProvider::with_responses(vec!["not valid json".to_string()]);
1063
1064 let fallback_json = r#"{"tasks": [
1066 {"task_id": "fallback-0", "title": "Fallback Task", "description": "via planner", "depends_on": []}
1067 ]}"#
1068 .to_string();
1069 let planner = LlmPlanner::new(
1070 MockProvider::with_responses(vec![fallback_json]),
1071 &OrchestrationConfig::default(),
1072 );
1073
1074 let (result_graph, _) = plan_with_cache(
1075 &planner,
1076 Some(&cache),
1077 &bad_provider, Some(&[1.0_f32, 0.0]),
1079 "test-model",
1080 "deploy service",
1081 &[],
1082 20,
1083 Duration::from_mins(2),
1084 )
1085 .await
1086 .unwrap();
1087
1088 assert_eq!(result_graph.tasks[0].title, "Fallback Task");
1090 }
1091}