Skip to main content

zeph_orchestration/
plan_cache.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Plan template caching for the LLM planner.
5//!
6//! Caches completed `TaskGraph` plans as reusable `PlanTemplate` skeletons.
7//! On subsequent semantically similar goals, retrieves the closest template
8//! and uses a lightweight LLM adaptation call instead of full decomposition.
9
10use std::time::Duration;
11
12use blake3;
13use serde::{Deserialize, Serialize};
14use zeph_config::PlanCacheConfig;
15use zeph_db::DbPool;
16#[cfg(any(feature = "sqlite", feature = "postgres"))]
17use zeph_db::sql;
18use zeph_llm::provider::{LlmProvider, Message, Role};
19
20use super::dag;
21use super::error::OrchestrationError;
22use super::graph::{FailureStrategy, PlanSlug, TaskGraph};
23use super::planner::{PlannerResponse, convert_response_pub};
24use zeph_subagent::SubAgentDef;
25
26/// Structural skeleton of a single task, stripped of all runtime state.
27///
28/// Used inside a [`PlanTemplate`]. Serialised as JSON in the `plan_cache` table.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TemplateTask {
31    /// Human-readable task title.
32    pub title: String,
33    /// Full task description (agent prompt).
34    pub description: String,
35    /// Preferred agent name, if specified by the original planner.
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub agent_hint: Option<String>,
38    /// Kebab-case slugs of tasks this task depends on.
39    #[serde(default, skip_serializing_if = "Vec::is_empty")]
40    pub depends_on: Vec<PlanSlug>,
41    /// Failure strategy override for this task, if set by the original planner.
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub failure_strategy: Option<FailureStrategy>,
44    /// Stable kebab-case slug assigned during template extraction.
45    pub task_id: PlanSlug,
46}
47
48/// Reusable plan skeleton extracted from a successfully completed [`TaskGraph`].
49///
50/// Contains only structural information (task titles, descriptions, dependencies,
51/// agent hints). All runtime state (`status`, `results`, `retry_count`,
52/// `assigned_agent`, timestamps) is stripped.
53///
54/// Stored as JSON in the `plan_cache` `SQLite` table. Adapted by an LLM call when
55/// a semantically similar goal arrives via [`plan_with_cache`].
56///
57/// # Examples
58///
59/// ```rust
60/// use zeph_orchestration::plan_cache::{PlanTemplate, normalize_goal};
61///
62/// let normalized = normalize_goal("  Build AND Deploy  ");
63/// assert_eq!(normalized, "build and deploy");
64/// ```
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PlanTemplate {
67    /// Normalised goal text used for exact-match fallback.
68    pub goal: String,
69    /// Structural task skeleton ordered by dependency (roots first).
70    pub tasks: Vec<TemplateTask>,
71}
72
73impl PlanTemplate {
74    /// Extract a `PlanTemplate` from a completed `TaskGraph`.
75    ///
76    /// # Errors
77    ///
78    /// Returns `OrchestrationError::PlanningFailed` if the graph has no tasks.
79    pub fn from_task_graph(graph: &TaskGraph) -> Result<Self, OrchestrationError> {
80        if graph.tasks.is_empty() {
81            return Err(OrchestrationError::PlanningFailed(
82                "cannot cache a plan with zero tasks".into(),
83            ));
84        }
85
86        // Build PlanSlug indexed by position for depends_on reconstruction.
87        let id_to_slug: Vec<PlanSlug> = graph
88            .tasks
89            .iter()
90            .map(|n| PlanSlug::from(slugify_title(&n.title, n.id.as_u32())))
91            .collect();
92
93        let tasks = graph
94            .tasks
95            .iter()
96            .enumerate()
97            .map(|(i, node)| TemplateTask {
98                title: node.title.clone(),
99                description: node.description.clone(),
100                agent_hint: node.agent_hint.clone(),
101                depends_on: node
102                    .depends_on
103                    .iter()
104                    .map(|dep| id_to_slug[dep.index()].clone())
105                    .collect(),
106                failure_strategy: node.failure_strategy,
107                task_id: id_to_slug[i].clone(),
108            })
109            .collect();
110
111        Ok(Self {
112            goal: normalize_goal(&graph.goal),
113            tasks,
114        })
115    }
116}
117
118/// Normalize goal text: trim + collapse internal whitespace + lowercase.
119///
120/// Used consistently for hash computation and embedding input so that
121/// trivially different goal strings (capitalization, extra spaces) map
122/// to the same cache entry.
123#[must_use]
124pub fn normalize_goal(text: &str) -> String {
125    let trimmed = text.trim();
126    let mut result = String::with_capacity(trimmed.len());
127    let mut prev_space = false;
128    for ch in trimmed.chars() {
129        if ch.is_whitespace() {
130            if !prev_space && !result.is_empty() {
131                result.push(' ');
132                prev_space = true;
133            }
134        } else {
135            for lc in ch.to_lowercase() {
136                result.push(lc);
137            }
138            prev_space = false;
139        }
140    }
141    result
142}
143
144/// Compute a BLAKE3 hex hash of a normalized goal string.
145#[must_use]
146pub fn goal_hash(normalized: &str) -> String {
147    blake3::hash(normalized.as_bytes()).to_hex().to_string()
148}
149
150/// Convert a task title + index into a stable kebab-case `task_id` for template use.
151fn slugify_title(title: &str, idx: u32) -> String {
152    let slug: String = title
153        .chars()
154        .map(|c| {
155            if c.is_ascii_alphanumeric() {
156                c.to_ascii_lowercase()
157            } else {
158                '-'
159            }
160        })
161        .collect::<String>()
162        .split('-')
163        .filter(|s| !s.is_empty())
164        .collect::<Vec<_>>()
165        .join("-");
166
167    // Cap at 32 chars, then append index to ensure uniqueness.
168    let capped = if slug.len() > 32 { &slug[..32] } else { &slug };
169    // Trim trailing dashes after cap.
170    let capped = capped.trim_end_matches('-');
171    if capped.is_empty() {
172        format!("task-{idx}")
173    } else {
174        format!("{capped}-{idx}")
175    }
176}
177
178/// Serialize an `f32` slice to a `Vec<u8>` BLOB using explicit little-endian encoding.
179fn embedding_to_blob(embedding: &[f32]) -> Vec<u8> {
180    embedding.iter().flat_map(|f| f.to_le_bytes()).collect()
181}
182
183/// Deserialize an `f32` slice from a BLOB using chunk-based little-endian decoding.
184///
185/// Returns `None` and logs a warning if the BLOB length is not a multiple of 4.
186/// Does not require aligned memory — safe for `Vec<u8>` returned by `SQLite`.
187fn blob_to_embedding(blob: &[u8]) -> Option<Vec<f32>> {
188    if !blob.len().is_multiple_of(4) {
189        tracing::warn!(
190            len = blob.len(),
191            "plan cache: embedding blob length not a multiple of 4"
192        );
193        return None;
194    }
195    Some(
196        blob.chunks_exact(4)
197            .map(|chunk| f32::from_le_bytes(chunk.try_into().expect("chunk is exactly 4 bytes")))
198            .collect(),
199    )
200}
201
202fn unix_now() -> i64 {
203    #[allow(clippy::cast_possible_wrap)]
204    {
205        std::time::SystemTime::now()
206            .duration_since(std::time::UNIX_EPOCH)
207            .unwrap_or_default()
208            .as_secs() as i64
209    }
210}
211
212#[non_exhaustive]
213/// Error type for plan cache operations.
214///
215/// All errors are non-fatal in the context of the planning critical path:
216/// callers should log the error and fall back to full LLM decomposition.
217#[derive(Debug, thiserror::Error)]
218pub enum PlanCacheError {
219    /// A `SQLite` query failed.
220    #[error("sqlx error: {0}")]
221    Database(#[from] zeph_db::SqlxError),
222    /// JSON serialization or deserialization of a plan template failed.
223    #[error("serialization error: {0}")]
224    Serialization(#[from] serde_json::Error),
225    /// Extracting a [`PlanTemplate`] from a completed [`TaskGraph`] failed.
226    #[error("plan template extraction failed: {0}")]
227    Extraction(String),
228}
229
230/// Plan template cache backed by `SQLite` with in-process cosine similarity search.
231///
232/// Stores embeddings as BLOB columns and computes cosine similarity in-process
233/// (same pattern as `ResponseCache`). Graceful degradation: all failures are
234/// logged as WARN and never block the planning critical path.
235pub struct PlanCache {
236    pool: DbPool,
237    config: PlanCacheConfig,
238}
239
240impl PlanCache {
241    /// Create a new `PlanCache` and invalidate stale embeddings for the given model.
242    ///
243    /// # Errors
244    ///
245    /// Returns `PlanCacheError` if the stale embedding invalidation query fails.
246    #[tracing::instrument(
247        name = "orchestration.plan_cache.new",
248        skip_all,
249        fields(current_embedding_model = current_embedding_model)
250    )]
251    pub async fn new(
252        pool: DbPool,
253        config: PlanCacheConfig,
254        current_embedding_model: &str,
255    ) -> Result<Self, PlanCacheError> {
256        let cache = Self { pool, config };
257        cache
258            .invalidate_stale_embeddings(current_embedding_model)
259            .await?;
260        Ok(cache)
261    }
262
263    /// NULL-ify embeddings stored under a different model to prevent cross-model false hits.
264    ///
265    /// # Errors
266    ///
267    /// Returns `PlanCacheError::Database` on query failure.
268    async fn invalidate_stale_embeddings(&self, current_model: &str) -> Result<(), PlanCacheError> {
269        let affected = zeph_db::query(sql!(
270            "UPDATE plan_cache SET embedding = NULL, embedding_model = NULL \
271             WHERE embedding IS NOT NULL AND embedding_model != ?"
272        ))
273        .bind(current_model)
274        .execute(&self.pool)
275        .await?
276        .rows_affected();
277
278        if affected > 0 {
279            tracing::info!(
280                rows = affected,
281                current_model,
282                "plan cache: invalidated stale embeddings for model change"
283            );
284        }
285        Ok(())
286    }
287
288    /// Find the most similar cached plan template for the given goal embedding.
289    ///
290    /// Fetches all rows with matching `embedding_model`, computes cosine similarity
291    /// in-process, and returns the best match if it meets `similarity_threshold`.
292    ///
293    /// Also updates `last_accessed_at` on a hit.
294    ///
295    /// # Errors
296    ///
297    /// Returns `PlanCacheError::Database` on query failure or
298    /// `PlanCacheError::Serialization` on template JSON deserialization failure.
299    #[tracing::instrument(name = "orchestration.plan_cache.find_similar", skip_all, fields(embedding_model = embedding_model))]
300    pub async fn find_similar(
301        &self,
302        goal_embedding: &[f32],
303        embedding_model: &str,
304    ) -> Result<Option<(PlanTemplate, f32)>, PlanCacheError> {
305        let rows: Vec<(String, String, Vec<u8>)> = zeph_db::query_as(sql!(
306            "SELECT id, template, embedding FROM plan_cache \
307             WHERE embedding IS NOT NULL AND embedding_model = ? \
308             ORDER BY last_accessed_at DESC LIMIT ?"
309        ))
310        .bind(embedding_model)
311        .bind(i64::from(self.config.max_templates))
312        .fetch_all(&self.pool)
313        .await?;
314
315        let mut best_score = -1.0_f32;
316        let mut best_id: Option<String> = None;
317        let mut best_template_json: Option<String> = None;
318
319        for (id, template_json, blob) in rows {
320            if let Some(stored) = blob_to_embedding(&blob) {
321                let score = zeph_common::math::cosine_similarity(goal_embedding, &stored);
322                if score > best_score {
323                    best_score = score;
324                    best_id = Some(id);
325                    best_template_json = Some(template_json);
326                }
327            }
328        }
329
330        if best_score >= self.config.similarity_threshold
331            && let (Some(id), Some(json)) = (best_id, best_template_json)
332        {
333            // Update last_accessed_at on hit.
334            let now = unix_now();
335            if let Err(e) = zeph_db::query(sql!(
336                "UPDATE plan_cache SET last_accessed_at = ?, adapted_count = adapted_count + 1 \
337                 WHERE id = ?"
338            ))
339            .bind(now)
340            .bind(&id)
341            .execute(&self.pool)
342            .await
343            {
344                tracing::warn!(error = %e, "plan cache: failed to update last_accessed_at");
345            }
346            let template: PlanTemplate = serde_json::from_str(&json)?;
347            return Ok(Some((template, best_score)));
348        }
349
350        Ok(None)
351    }
352
353    /// Store a completed plan as a reusable template.
354    ///
355    /// Extracts a `PlanTemplate` from the `TaskGraph`, serializes it to JSON,
356    /// and upserts into `SQLite` using `INSERT OR REPLACE ON CONFLICT(goal_hash)`.
357    /// Deduplication is enforced by the `UNIQUE` constraint on `goal_hash`.
358    ///
359    /// # Errors
360    ///
361    /// Returns `PlanCacheError` on extraction, serialization, or database failure.
362    #[tracing::instrument(name = "orchestration.plan_cache.cache_plan", skip_all, fields(goal_hash = tracing::field::Empty))]
363    pub async fn cache_plan(
364        &self,
365        graph: &TaskGraph,
366        goal_embedding: &[f32],
367        embedding_model: &str,
368    ) -> Result<(), PlanCacheError> {
369        let template = PlanTemplate::from_task_graph(graph)
370            .map_err(|e| PlanCacheError::Extraction(e.to_string()))?;
371
372        let normalized = normalize_goal(&graph.goal);
373        let hash = goal_hash(&normalized);
374        tracing::Span::current().record("goal_hash", hash.as_str());
375        let template_json = serde_json::to_string(&template)?;
376        let task_count = i64::try_from(template.tasks.len()).unwrap_or(i64::MAX);
377        let now = unix_now();
378        let id = uuid::Uuid::new_v4().to_string();
379        let blob = embedding_to_blob(goal_embedding);
380
381        zeph_db::query(sql!(
382            "INSERT INTO plan_cache \
383             (id, goal_hash, goal_text, template, task_count, success_count, adapted_count, \
384              embedding, embedding_model, created_at, last_accessed_at) \
385             VALUES (?, ?, ?, ?, ?, 1, 0, ?, ?, ?, ?) \
386             ON CONFLICT(goal_hash) DO UPDATE SET \
387               success_count = success_count + 1, \
388               template = excluded.template, \
389               task_count = excluded.task_count, \
390               embedding = excluded.embedding, \
391               embedding_model = excluded.embedding_model, \
392               last_accessed_at = excluded.last_accessed_at"
393        ))
394        .bind(&id)
395        .bind(&hash)
396        .bind(&normalized)
397        .bind(&template_json)
398        .bind(task_count)
399        .bind(&blob)
400        .bind(embedding_model)
401        .bind(now)
402        .bind(now)
403        .execute(&self.pool)
404        .await?;
405
406        // Evict after inserting to keep within size bounds.
407        if let Err(e) = self.evict().await {
408            tracing::warn!(error = %e, "plan cache: eviction failed after cache_plan");
409        }
410
411        Ok(())
412    }
413
414    /// Run TTL + size-cap LRU eviction.
415    ///
416    /// Phase 1: Delete rows where `last_accessed_at < now - ttl_days * 86400`.
417    /// Phase 2: If count exceeds `max_templates`, delete the least-recently-accessed rows.
418    ///
419    /// Returns the total number of rows deleted.
420    ///
421    /// # Errors
422    ///
423    /// Returns `PlanCacheError::Database` on query failure.
424    #[tracing::instrument(name = "orchestration.plan_cache.evict", skip_all)]
425    pub async fn evict(&self) -> Result<u32, PlanCacheError> {
426        let now = unix_now();
427        let ttl_secs = i64::from(self.config.ttl_days) * 86_400;
428        let cutoff = now.saturating_sub(ttl_secs);
429
430        let ttl_deleted = zeph_db::query(sql!("DELETE FROM plan_cache WHERE last_accessed_at < ?"))
431            .bind(cutoff)
432            .execute(&self.pool)
433            .await?
434            .rows_affected();
435
436        // Count remaining rows.
437        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
438            .fetch_one(&self.pool)
439            .await?;
440
441        let max = i64::from(self.config.max_templates);
442        let lru_deleted = if count > max {
443            let excess = count - max;
444            zeph_db::query(sql!(
445                "DELETE FROM plan_cache WHERE id IN \
446                 (SELECT id FROM plan_cache ORDER BY last_accessed_at ASC LIMIT ?)"
447            ))
448            .bind(excess)
449            .execute(&self.pool)
450            .await?
451            .rows_affected()
452        } else {
453            0
454        };
455
456        let total = ttl_deleted + lru_deleted;
457        if total > 0 {
458            tracing::debug!(ttl_deleted, lru_deleted, "plan cache: eviction complete");
459        }
460        Ok(u32::try_from(total).unwrap_or(u32::MAX))
461    }
462}
463
464/// Wrapper that checks the plan cache before calling the planner.
465///
466/// On a cache hit, calls `adapt_plan` with the cached template and the given
467/// `LlmProvider`. Falls back to full `planner.plan()` on any failure.
468///
469/// # Errors
470///
471/// Returns `OrchestrationError` from the planner on full-decomposition fallback.
472#[allow(clippy::too_many_arguments)]
473#[tracing::instrument(name = "orchestration.plan_cache.plan", skip_all, fields(goal_len = goal.len()))]
474pub async fn plan_with_cache<P>(
475    planner: &P,
476    plan_cache: Option<&PlanCache>,
477    provider: &impl LlmProvider,
478    embedding: Option<&[f32]>,
479    embedding_model: &str,
480    goal: &str,
481    available_agents: &[SubAgentDef],
482    max_tasks: u32,
483    planner_timeout: Duration,
484) -> Result<(TaskGraph, Option<(u64, u64)>), OrchestrationError>
485where
486    P: super::planner::Planner,
487{
488    if let (Some(cache), Some(emb)) = (plan_cache, embedding)
489        && cache.config.enabled
490    {
491        match cache.find_similar(emb, embedding_model).await {
492            Ok(Some((template, score))) => {
493                tracing::info!(
494                    similarity = score,
495                    tasks = template.tasks.len(),
496                    "plan cache hit, adapting template"
497                );
498                match adapt_plan(
499                    provider,
500                    goal,
501                    &template,
502                    available_agents,
503                    max_tasks,
504                    planner_timeout,
505                )
506                .await
507                {
508                    Ok(result) => return Ok(result),
509                    Err(e) => {
510                        tracing::warn!(
511                            error = %e,
512                            "plan cache: adaptation failed, falling back to full decomposition"
513                        );
514                    }
515                }
516            }
517            Ok(None) => {
518                tracing::debug!("plan cache miss");
519            }
520            Err(e) => {
521                tracing::warn!(error = %e, "plan cache: find_similar failed, using full decomposition");
522            }
523        }
524    }
525
526    planner.plan(goal, available_agents).await
527}
528
529/// Build an adaptation prompt and call the LLM to produce a `TaskGraph` adapted
530/// from a cached template for the new goal.
531///
532/// Uses `LlmProvider::chat_typed` with the same `PlannerResponse` schema as the
533/// full planner, so the existing `convert_response + dag::validate` pipeline applies.
534///
535/// # Errors
536///
537/// Returns `OrchestrationError::PlanningFailed` if the LLM call fails, times out, or the
538/// adapted graph fails DAG validation.
539#[tracing::instrument(name = "orchestration.plan_cache.adapt", skip_all, fields(goal_len = goal.len()))]
540async fn adapt_plan(
541    provider: &impl LlmProvider,
542    goal: &str,
543    template: &PlanTemplate,
544    available_agents: &[SubAgentDef],
545    max_tasks: u32,
546    timeout: Duration,
547) -> Result<(TaskGraph, Option<(u64, u64)>), OrchestrationError> {
548    use zeph_subagent::ToolPolicy;
549
550    let agent_catalog = available_agents
551        .iter()
552        .map(|a| {
553            let tools = match &a.tools {
554                ToolPolicy::AllowList(list) => list.join(", "),
555                ToolPolicy::DenyList(excluded) => {
556                    format!("all except: [{}]", excluded.join(", "))
557                }
558                _ => "all".to_string(),
559            };
560            format!(
561                "- name: \"{}\", description: \"{}\", tools: [{}]",
562                a.name, a.description, tools
563            )
564        })
565        .collect::<Vec<_>>()
566        .join("\n");
567
568    let template_json = serde_json::to_string(&template.tasks)
569        .map_err(|e| OrchestrationError::PlanningFailed(e.to_string()))?;
570
571    let system = format!(
572        "You are a task planner. A cached plan template exists for a similar goal. \
573         Adapt it for the new goal by adjusting task descriptions and adding or removing \
574         tasks as needed. Keep the same JSON structure.\n\n\
575         Available agents:\n{agent_catalog}\n\n\
576         Rules:\n\
577         - Each task must have a unique task_id (short, descriptive, kebab-case: [a-z0-9-]).\n\
578         - Specify dependencies using task_id strings in depends_on.\n\
579         - Do not create more than {max_tasks} tasks.\n\
580         - failure_strategy is optional: \"abort\", \"retry\", \"skip\", \"ask\"."
581    );
582
583    let user = format!(
584        "New goal:\n{goal}\n\nCached template (for similar goal \"{}\"):\n{template_json}\n\n\
585         Adapt the template for the new goal. Return JSON: {{\"tasks\": [...]}}",
586        template.goal
587    );
588
589    let messages = vec![
590        Message::from_legacy(Role::System, system),
591        Message::from_legacy(Role::User, user),
592    ];
593
594    let response: PlannerResponse = tokio::time::timeout(timeout, provider.chat_typed(&messages))
595        .await
596        .map_err(|_| OrchestrationError::PlanningFailed("plan cache adaptation timed out".into()))?
597        .map_err(|e| OrchestrationError::PlanningFailed(e.to_string()))?;
598
599    let usage = provider.last_usage();
600
601    let graph = convert_response_pub(response, goal, available_agents, max_tasks)?;
602
603    dag::validate(&graph.tasks, max_tasks as usize)?;
604
605    Ok((graph, usage))
606}
607
608#[cfg(test)]
609mod tests {
610    use super::super::graph::{TaskId, TaskNode};
611    use super::*;
612    use zeph_memory::store::SqliteStore;
613
614    async fn test_pool() -> DbPool {
615        let store = SqliteStore::new(":memory:").await.unwrap();
616        store.pool().clone()
617    }
618
619    async fn test_cache(pool: DbPool) -> PlanCache {
620        PlanCache::new(pool, PlanCacheConfig::default(), "test-model")
621            .await
622            .unwrap()
623    }
624
625    fn make_graph(goal: &str, tasks: &[(&str, &str, &[u32])]) -> TaskGraph {
626        let mut graph = TaskGraph::new(goal);
627        for (i, (title, desc, deps)) in tasks.iter().enumerate() {
628            #[allow(clippy::cast_possible_truncation)]
629            let mut node = TaskNode::new(i as u32, *title, *desc);
630            node.depends_on = deps.iter().map(|&d| TaskId(d)).collect();
631            graph.tasks.push(node);
632        }
633        graph
634    }
635
636    // --- normalize_goal tests ---
637
638    #[test]
639    fn normalize_trims_and_lowercases() {
640        assert_eq!(normalize_goal("  Hello World  "), "hello world");
641    }
642
643    #[test]
644    fn normalize_collapses_internal_whitespace() {
645        assert_eq!(normalize_goal("hello   world"), "hello world");
646    }
647
648    #[test]
649    fn normalize_empty_string() {
650        assert_eq!(normalize_goal(""), "");
651    }
652
653    #[test]
654    fn normalize_whitespace_only() {
655        assert_eq!(normalize_goal("   "), "");
656    }
657
658    // --- goal_hash tests ---
659
660    #[test]
661    fn goal_hash_is_deterministic() {
662        let h1 = goal_hash("deploy service");
663        let h2 = goal_hash("deploy service");
664        assert_eq!(h1, h2);
665    }
666
667    #[test]
668    fn goal_hash_differs_for_different_goals() {
669        assert_ne!(goal_hash("deploy service"), goal_hash("build artifact"));
670    }
671
672    #[test]
673    fn goal_hash_nonempty() {
674        assert!(!goal_hash("goal").is_empty());
675    }
676
677    // --- PlanTemplate extraction tests ---
678
679    #[test]
680    fn template_from_empty_graph_returns_error() {
681        let graph = TaskGraph::new("goal");
682        assert!(PlanTemplate::from_task_graph(&graph).is_err());
683    }
684
685    #[test]
686    fn template_strips_runtime_fields() {
687        use crate::graph::TaskStatus;
688        let mut graph = make_graph("goal", &[("Fetch data", "Download it", &[])]);
689        graph.tasks[0].status = TaskStatus::Completed;
690        graph.tasks[0].retry_count = 3;
691        graph.tasks[0].assigned_agent = Some("agent-x".to_string());
692        let template = PlanTemplate::from_task_graph(&graph).unwrap();
693        // Template only has structural data — no TaskStatus, retry_count, etc.
694        assert_eq!(template.tasks[0].title, "Fetch data");
695        assert_eq!(template.tasks[0].description, "Download it");
696    }
697
698    #[test]
699    fn template_preserves_dependencies() {
700        let graph = make_graph("goal", &[("Task A", "do A", &[]), ("Task B", "do B", &[0])]);
701        let template = PlanTemplate::from_task_graph(&graph).unwrap();
702        assert_eq!(template.tasks.len(), 2);
703        assert!(template.tasks[0].depends_on.is_empty());
704        assert_eq!(template.tasks[1].depends_on.len(), 1);
705        assert_eq!(template.tasks[1].depends_on[0], template.tasks[0].task_id);
706    }
707
708    #[test]
709    fn template_serde_roundtrip() {
710        let graph = make_graph("goal", &[("Step one", "do step one", &[])]);
711        let template = PlanTemplate::from_task_graph(&graph).unwrap();
712        let json = serde_json::to_string(&template).unwrap();
713        let restored: PlanTemplate = serde_json::from_str(&json).unwrap();
714        assert_eq!(template.tasks[0].title, restored.tasks[0].title);
715        assert_eq!(template.goal, restored.goal);
716    }
717
718    // --- BLOB serialization tests ---
719
720    #[test]
721    fn embedding_blob_roundtrip() {
722        let embedding = vec![1.0_f32, 0.5, 0.25, -1.0];
723        let blob = embedding_to_blob(&embedding);
724        let restored = blob_to_embedding(&blob).unwrap();
725        assert_eq!(embedding, restored);
726    }
727
728    #[test]
729    fn blob_to_embedding_odd_length_returns_none() {
730        let bad_blob = vec![0u8; 5]; // not a multiple of 4
731        assert!(blob_to_embedding(&bad_blob).is_none());
732    }
733
734    // --- PlanCache integration tests ---
735
736    #[tokio::test]
737    async fn cache_miss_on_empty_cache() {
738        let pool = test_pool().await;
739        let cache = test_cache(pool).await;
740        let result = cache
741            .find_similar(&[1.0, 0.0, 0.0], "test-model")
742            .await
743            .unwrap();
744        assert!(result.is_none());
745    }
746
747    #[tokio::test]
748    async fn cache_store_and_hit() {
749        let pool = test_pool().await;
750        let config = PlanCacheConfig {
751            similarity_threshold: 0.9,
752            ..PlanCacheConfig::default()
753        };
754        let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
755
756        let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
757        let embedding = vec![1.0_f32, 0.0, 0.0];
758        cache
759            .cache_plan(&graph, &embedding, "test-model")
760            .await
761            .unwrap();
762
763        // Same embedding should hit.
764        let result = cache
765            .find_similar(&[1.0, 0.0, 0.0], "test-model")
766            .await
767            .unwrap();
768        assert!(result.is_some());
769        let (template, score) = result.unwrap();
770        assert!((score - 1.0).abs() < 1e-5);
771        assert_eq!(template.tasks.len(), 1);
772    }
773
774    #[tokio::test]
775    async fn cache_miss_on_dissimilar_goal() {
776        let pool = test_pool().await;
777        let config = PlanCacheConfig {
778            similarity_threshold: 0.9,
779            ..PlanCacheConfig::default()
780        };
781        let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
782
783        let graph = make_graph("goal a", &[("Task", "do it", &[])]);
784        cache
785            .cache_plan(&graph, &[1.0_f32, 0.0, 0.0], "test-model")
786            .await
787            .unwrap();
788
789        // Orthogonal vector — should not hit at threshold 0.9.
790        let result = cache
791            .find_similar(&[0.0, 1.0, 0.0], "test-model")
792            .await
793            .unwrap();
794        assert!(result.is_none());
795    }
796
797    #[tokio::test]
798    async fn deduplication_increments_success_count() {
799        let pool = test_pool().await;
800        let cache = test_cache(pool.clone()).await;
801
802        let graph = make_graph("same goal", &[("Task", "do it", &[])]);
803        let emb = vec![1.0_f32, 0.0];
804
805        cache.cache_plan(&graph, &emb, "test-model").await.unwrap();
806        cache.cache_plan(&graph, &emb, "test-model").await.unwrap();
807
808        // Only one row due to UNIQUE goal_hash.
809        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
810            .fetch_one(&pool)
811            .await
812            .unwrap();
813        assert_eq!(count, 1);
814
815        let success: i64 = zeph_db::query_scalar(sql!("SELECT success_count FROM plan_cache"))
816            .fetch_one(&pool)
817            .await
818            .unwrap();
819        assert_eq!(success, 2);
820    }
821
822    #[tokio::test]
823    async fn eviction_removes_ttl_expired_rows() {
824        let pool = test_pool().await;
825        // TTL of 0 days means everything is immediately expired.
826        let config = PlanCacheConfig {
827            ttl_days: 0,
828            ..PlanCacheConfig::default()
829        };
830        let cache = PlanCache::new(pool.clone(), config, "test-model")
831            .await
832            .unwrap();
833
834        // Insert a row by bypassing the API to set last_accessed_at in the past.
835        let now = unix_now() - 1;
836        zeph_db::query(sql!(
837            "INSERT INTO plan_cache \
838             (id, goal_hash, goal_text, template, task_count, created_at, last_accessed_at) \
839             VALUES (?, ?, ?, ?, ?, ?, ?)"
840        ))
841        .bind("test-id")
842        .bind("hash-1")
843        .bind("goal")
844        .bind("{\"goal\":\"goal\",\"tasks\":[]}")
845        .bind(0_i64)
846        .bind(now)
847        .bind(now)
848        .execute(&pool)
849        .await
850        .unwrap();
851
852        let deleted = cache.evict().await.unwrap();
853        assert!(deleted >= 1);
854
855        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
856            .fetch_one(&pool)
857            .await
858            .unwrap();
859        assert_eq!(count, 0);
860    }
861
862    #[tokio::test]
863    async fn eviction_lru_when_over_max() {
864        let pool = test_pool().await;
865        let config = PlanCacheConfig {
866            max_templates: 2,
867            ttl_days: 365,
868            ..PlanCacheConfig::default()
869        };
870        let cache = PlanCache::new(pool.clone(), config, "test-model")
871            .await
872            .unwrap();
873
874        let now = unix_now();
875        // Insert 3 rows with different last_accessed_at, all recent enough to survive TTL.
876        for i in 0..3_i64 {
877            zeph_db::query(sql!(
878                "INSERT INTO plan_cache \
879                 (id, goal_hash, goal_text, template, task_count, created_at, last_accessed_at) \
880                 VALUES (?, ?, ?, ?, ?, ?, ?)"
881            ))
882            .bind(format!("id-{i}"))
883            .bind(format!("hash-{i}"))
884            .bind(format!("goal-{i}"))
885            .bind("{\"goal\":\"g\",\"tasks\":[]}")
886            .bind(0_i64)
887            .bind(now)
888            .bind(now + i) // i=0 is least recently accessed, i=2 most recent
889            .execute(&pool)
890            .await
891            .unwrap();
892        }
893
894        let deleted = cache.evict().await.unwrap();
895        assert_eq!(deleted, 1);
896
897        // The row with smallest last_accessed_at (id-0) should be gone.
898        let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM plan_cache"))
899            .fetch_one(&pool)
900            .await
901            .unwrap();
902        assert_eq!(count, 2);
903    }
904
905    #[tokio::test]
906    async fn stale_embedding_invalidated_on_new() {
907        let pool = test_pool().await;
908        let now = unix_now();
909
910        // Insert a row with "old-model" embedding.
911        let emb = embedding_to_blob(&[1.0_f32, 0.0]);
912        zeph_db::query(sql!(
913            "INSERT INTO plan_cache \
914             (id, goal_hash, goal_text, template, task_count, embedding, embedding_model, \
915              created_at, last_accessed_at) \
916             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
917        ))
918        .bind("id-old")
919        .bind("hash-old")
920        .bind("goal old")
921        .bind("{\"goal\":\"g\",\"tasks\":[]}")
922        .bind(0_i64)
923        .bind(&emb)
924        .bind("old-model")
925        .bind(now)
926        .bind(now)
927        .execute(&pool)
928        .await
929        .unwrap();
930
931        // Constructing cache with "new-model" should invalidate the old embedding.
932        let _cache = PlanCache::new(pool.clone(), PlanCacheConfig::default(), "new-model")
933            .await
934            .unwrap();
935
936        let model: Option<String> = zeph_db::query_scalar(sql!(
937            "SELECT embedding_model FROM plan_cache WHERE id = 'id-old'"
938        ))
939        .fetch_one(&pool)
940        .await
941        .unwrap();
942        assert!(model.is_none(), "stale embedding_model should be NULL");
943
944        let emb_col: Option<Vec<u8>> =
945            zeph_db::query_scalar(sql!("SELECT embedding FROM plan_cache WHERE id = 'id-old'"))
946                .fetch_one(&pool)
947                .await
948                .unwrap();
949        assert!(emb_col.is_none(), "stale embedding should be NULL");
950    }
951
952    #[tokio::test]
953    async fn disabled_cache_not_used_in_plan_with_cache() {
954        use crate::planner::LlmPlanner;
955        use zeph_config::OrchestrationConfig;
956        use zeph_llm::mock::MockProvider;
957
958        let pool = test_pool().await;
959        let config = PlanCacheConfig::default(); // enabled = false
960        let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
961
962        let graph_json = r#"{"tasks": [
963            {"task_id": "t1", "title": "Task", "description": "do it", "depends_on": []}
964        ]}"#
965        .to_string();
966
967        let provider = MockProvider::with_responses(vec![graph_json.clone()]);
968        let planner = LlmPlanner::new(
969            MockProvider::with_responses(vec![graph_json]),
970            &OrchestrationConfig::default(),
971        );
972
973        let (graph, _) = plan_with_cache(
974            &planner,
975            Some(&cache),
976            &provider,
977            Some(&[1.0_f32, 0.0]),
978            "test-model",
979            "do something",
980            &[],
981            20,
982            Duration::from_mins(2),
983        )
984        .await
985        .unwrap();
986
987        assert_eq!(graph.tasks.len(), 1);
988    }
989
990    #[tokio::test]
991    async fn plan_with_cache_with_none_embedding_skips_cache() {
992        use crate::planner::LlmPlanner;
993        use zeph_config::OrchestrationConfig;
994        use zeph_llm::mock::MockProvider;
995
996        let pool = test_pool().await;
997        let config = PlanCacheConfig {
998            enabled: true,
999            similarity_threshold: 0.5,
1000            ..PlanCacheConfig::default()
1001        };
1002        let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
1003
1004        // Pre-populate cache with a similar goal.
1005        let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
1006        cache
1007            .cache_plan(&graph, &[1.0_f32, 0.0], "test-model")
1008            .await
1009            .unwrap();
1010
1011        let graph_json = r#"{"tasks": [
1012            {"task_id": "fallback-task-0", "title": "Fallback", "description": "planner fallback", "depends_on": []}
1013        ]}"#
1014        .to_string();
1015
1016        let provider = MockProvider::with_responses(vec![graph_json.clone()]);
1017        let planner = LlmPlanner::new(
1018            MockProvider::with_responses(vec![graph_json]),
1019            &OrchestrationConfig::default(),
1020        );
1021
1022        // embedding = None → must skip cache and call planner.
1023        let (result_graph, _) = plan_with_cache(
1024            &planner,
1025            Some(&cache),
1026            &provider,
1027            None, // no embedding provided
1028            "test-model",
1029            "deploy service",
1030            &[],
1031            20,
1032            Duration::from_mins(2),
1033        )
1034        .await
1035        .unwrap();
1036
1037        assert_eq!(result_graph.tasks[0].title, "Fallback");
1038    }
1039
1040    #[tokio::test]
1041    async fn adapt_plan_error_fallback_to_full_decomposition() {
1042        use crate::planner::LlmPlanner;
1043        use zeph_config::OrchestrationConfig;
1044        use zeph_llm::mock::MockProvider;
1045
1046        let pool = test_pool().await;
1047        let config = PlanCacheConfig {
1048            enabled: true,
1049            similarity_threshold: 0.5,
1050            ..PlanCacheConfig::default()
1051        };
1052        let cache = PlanCache::new(pool, config, "test-model").await.unwrap();
1053
1054        // Pre-populate cache with matching embedding.
1055        let graph = make_graph("deploy service", &[("Build", "build it", &[])]);
1056        cache
1057            .cache_plan(&graph, &[1.0_f32, 0.0], "test-model")
1058            .await
1059            .unwrap();
1060
1061        // Provider for adapt_plan returns invalid JSON — adaptation fails.
1062        let bad_provider = MockProvider::with_responses(vec!["not valid json".to_string()]);
1063
1064        // Planner (fallback path) returns a valid response.
1065        let fallback_json = r#"{"tasks": [
1066            {"task_id": "fallback-0", "title": "Fallback Task", "description": "via planner", "depends_on": []}
1067        ]}"#
1068        .to_string();
1069        let planner = LlmPlanner::new(
1070            MockProvider::with_responses(vec![fallback_json]),
1071            &OrchestrationConfig::default(),
1072        );
1073
1074        let (result_graph, _) = plan_with_cache(
1075            &planner,
1076            Some(&cache),
1077            &bad_provider, // adapt_plan will fail with this provider
1078            Some(&[1.0_f32, 0.0]),
1079            "test-model",
1080            "deploy service",
1081            &[],
1082            20,
1083            Duration::from_mins(2),
1084        )
1085        .await
1086        .unwrap();
1087
1088        // Must return planner fallback result, not error.
1089        assert_eq!(result_graph.tasks[0].title, "Fallback Task");
1090    }
1091}