1use std::sync::Arc;
7
8use async_trait::async_trait;
9use thiserror::Error;
10
11use crate::step::TaskStep;
12
13mod parse;
14mod validate;
15use parse::{parse_steps, RawStep};
16
17#[derive(Debug, Error)]
18pub enum DecompositionError {
19 #[error("LLM error: {0}")]
20 Llm(#[from] cortex::llm::LlmError),
21 #[error("Failed to parse LLM output: {0}")]
22 Parse(String),
23 #[error("Empty plan — LLM produced no steps")]
24 EmptyPlan,
25}
26
27#[derive(Debug, Default)]
29pub struct DecompositionContext {
30 pub known_procedures: Vec<String>,
32 pub available_tools: Vec<String>,
36 pub relevant_facts: Vec<String>,
38 pub available_credentials: Vec<String>,
40 pub available_agents: Vec<String>,
45 pub available_capabilities: Vec<String>,
52}
53
54#[derive(Debug, Clone)]
58pub struct RepairContext {
59 pub original_request: String,
61 pub failed_step: String,
63 pub error: String,
65 pub completed: Vec<CompletedStepRecap>,
69}
70
71#[derive(Debug, Clone)]
73pub struct CompletedStepRecap {
74 pub description: String,
75 pub output_excerpt: String,
78}
79
80#[async_trait]
82pub trait TaskDecomposer: Send + Sync {
83 async fn decompose(
84 &self,
85 request: &str,
86 context: DecompositionContext,
87 ) -> Result<Vec<TaskStep>, DecompositionError>;
88
89 async fn replan_after_failure(
94 &self,
95 _repair: RepairContext,
96 _context: DecompositionContext,
97 ) -> Result<Vec<TaskStep>, DecompositionError> {
98 Err(DecompositionError::EmptyPlan)
99 }
100}
101
102pub struct LlmDecomposer {
104 llm: Arc<dyn cortex::LlmProvider>,
105}
106
107impl LlmDecomposer {
108 pub fn new(llm: Arc<dyn cortex::LlmProvider>) -> Self {
109 Self { llm }
110 }
111}
112
113impl LlmDecomposer {
114 async fn decompose_impl(
115 &self,
116 request: &str,
117 context: DecompositionContext,
118 ) -> Result<Vec<TaskStep>, DecompositionError> {
119 let mut user_prompt = format!("Decompose this request into steps:\n\n\"{request}\"");
120
121 if !context.known_procedures.is_empty() {
122 user_prompt.push_str("\n\nKnown procedures for similar tasks:\n");
123 for proc in &context.known_procedures {
124 user_prompt.push_str(&format!("- {proc}\n"));
125 }
126 }
127 if !context.relevant_facts.is_empty() {
128 user_prompt.push_str("\n\nRelevant project context:\n");
129 for fact in &context.relevant_facts {
130 user_prompt.push_str(&format!("- {fact}\n"));
131 }
132 }
133 if !context.available_tools.is_empty() {
134 user_prompt.push_str(
135 "\n\nAvailable sandbox binaries (every `execute`/`test` step MUST start with one of these — see system rules):\n ",
136 );
137 user_prompt.push_str(&context.available_tools.join(", "));
138 }
139 if !context.available_capabilities.is_empty() {
140 user_prompt.push_str(
141 "\n\nLive kernel capabilities (faculties wired right now — compose against these, do not invent others):\n",
142 );
143 for cap in &context.available_capabilities {
144 user_prompt.push_str(&format!("- {cap}\n"));
145 }
146 }
147 if !context.available_agents.is_empty() {
148 user_prompt.push_str(
149 "\n\nDelegate agents available for `implement` steps (the `agent` field MUST be exactly one of these):\n ",
150 );
151 user_prompt.push_str(&context.available_agents.join(", "));
152 }
153
154 let messages = vec![
155 cortex::llm::Message::system(crate::prompts::DECOMPOSE_SYSTEM),
156 cortex::llm::Message::user(user_prompt),
157 ];
158
159 let response = self.llm.generate(&messages).await?;
160 let mut raw_steps = parse_steps(&response.content)?;
161
162 if raw_steps.is_empty() {
163 return Err(DecompositionError::EmptyPlan);
164 }
165
166 validate::validate_steps(&raw_steps, &context)?;
167 validate::apply_sequential_fallback(&mut raw_steps);
168 Ok(validate::finalize(raw_steps))
169 }
170}
171
172impl LlmDecomposer {
173 async fn replan_inner(
174 &self,
175 repair: &RepairContext,
176 context: &DecompositionContext,
177 ) -> Result<Vec<RawStep>, DecompositionError> {
178 let mut user_prompt = format!(
179 "Original request:\n {}\n\nWhat already succeeded (do NOT redo). Each entry includes the actual stdout the step produced — base your next step on this real data, do not invent intermediate files:\n",
180 repair.original_request
181 );
182 if repair.completed.is_empty() {
183 user_prompt.push_str(" (nothing yet)\n");
184 } else {
185 for recap in &repair.completed {
186 user_prompt.push_str(&format!(" - {}\n", recap.description));
187 let excerpt = recap.output_excerpt.trim();
188 if excerpt.is_empty() {
189 user_prompt.push_str(" (no stdout)\n");
190 } else {
191 user_prompt.push_str(" stdout:\n");
192 for line in excerpt.lines() {
193 user_prompt.push_str(&format!(" {line}\n"));
194 }
195 }
196 }
197 }
198 user_prompt.push_str(&format!(
199 "\nFailed step:\n {}\n\nActual error:\n {}\n",
200 repair.failed_step, repair.error,
201 ));
202 if !context.available_tools.is_empty() {
203 user_prompt.push_str(
204 "\nAvailable sandbox binaries (for execute/test action_type — shell mode bypasses this):\n ",
205 );
206 user_prompt.push_str(&context.available_tools.join(", "));
207 }
208 if !context.available_agents.is_empty() {
209 user_prompt.push_str(
210 "\nDelegate agents available for `implement` steps (the `agent` field MUST be one of these):\n ",
211 );
212 user_prompt.push_str(&context.available_agents.join(", "));
213 }
214
215 let messages = vec![
216 cortex::llm::Message::system(crate::prompts::REPAIR_SYSTEM),
217 cortex::llm::Message::user(user_prompt),
218 ];
219
220 let response = self.llm.generate(&messages).await?;
221 parse_steps(&response.content)
222 }
223}
224
225#[async_trait]
226impl TaskDecomposer for LlmDecomposer {
227 async fn replan_after_failure(
228 &self,
229 repair: RepairContext,
230 context: DecompositionContext,
231 ) -> Result<Vec<TaskStep>, DecompositionError> {
232 let mut raw_steps = self.replan_inner(&repair, &context).await?;
233 if raw_steps.is_empty() {
234 return Err(DecompositionError::EmptyPlan);
235 }
236
237 validate::validate_steps(&raw_steps, &context)?;
241 validate::apply_sequential_fallback(&mut raw_steps);
242 Ok(validate::finalize(raw_steps))
243 }
244
245 async fn decompose(
246 &self,
247 request: &str,
248 context: DecompositionContext,
249 ) -> Result<Vec<TaskStep>, DecompositionError> {
250 self.decompose_impl(request, context).await
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::step::StepAction;
258
259 #[test]
260 fn test_parse_steps_basic() {
261 let json = r#"[
262 {
263 "description": "Research existing patterns",
264 "action_type": "research",
265 "query": "CSV export patterns",
266 "depends_on": [],
267 "tier": "read"
268 },
269 {
270 "description": "Implement CSV endpoint",
271 "action_type": "implement",
272 "spec": "Add /api/export/csv endpoint",
273 "agent": "claude-code",
274 "depends_on": [0],
275 "tier": "execute"
276 }
277 ]"#;
278
279 let steps = parse_steps(json).unwrap();
280 assert_eq!(steps.len(), 2);
281 assert_eq!(steps[0].action_type, "research");
282 assert_eq!(steps[1].depends_on, vec![0]);
283 }
284
285 #[test]
286 fn test_parse_steps_tolerates_null_fields() {
287 let json = r#"[
292 {
293 "description": "do thing",
294 "action_type": "shell",
295 "command": "echo hi",
296 "depends_on": null,
297 "tier": null,
298 "estimated_tokens": null,
299 "spec": null
300 }
301 ]"#;
302 let steps = parse_steps(json).expect("null fields should be lenient");
303 assert_eq!(steps.len(), 1);
304 assert_eq!(steps[0].action_type, "shell");
305 assert!(steps[0].depends_on.is_empty());
306 assert!(steps[0].tier.is_none());
307 }
308
309 #[test]
310 fn test_parse_steps_tolerates_integer_tier() {
311 let json = r#"[
316 {"description": "x", "action_type": "shell", "command": "true", "tier": 1}
317 ]"#;
318 let steps = parse_steps(json).expect("integer tier should not break parse");
319 assert_eq!(steps.len(), 1);
320 assert_eq!(steps[0].tier.as_deref(), Some("1"));
321 }
322
323 #[test]
324 fn test_parse_steps_tolerates_integer_string_fields() {
325 let json = r#"[
333 {"description": "noisy step", "action_type": "shell", "command": 0, "query": 1, "spec": 2.5, "tier": "read"}
334 ]"#;
335 let steps = parse_steps(json).expect("integer string-field values should not break parse");
336 assert_eq!(steps.len(), 1);
337 assert_eq!(steps[0].command.as_deref(), Some("0"));
338 assert_eq!(steps[0].query.as_deref(), Some("1"));
339 assert_eq!(steps[0].spec.as_deref(), Some("2.5"));
340 }
341
342 #[test]
343 fn test_parse_steps_tolerates_integer_depends_on() {
344 let json = r#"[
350 {"description": "first", "action_type": "shell", "command": "true", "depends_on": []},
351 {"description": "second", "action_type": "shell", "command": "true", "depends_on": 0}
352 ]"#;
353 let steps = parse_steps(json).expect("integer depends_on should not break parse");
354 assert_eq!(steps.len(), 2);
355 assert_eq!(steps[1].depends_on, vec![0]);
356 }
357
358 #[test]
359 fn test_parse_steps_tolerates_empty_string_fields() {
360 let json = r#"[
364 {"description": "x", "action_type": "notify", "channel": "", "message": "hello", "depends_on": []}
365 ]"#;
366 let steps = parse_steps(json).unwrap();
367 assert_eq!(steps.len(), 1);
368 assert!(steps[0].channel.is_none());
369 assert_eq!(steps[0].message.as_deref(), Some("hello"));
370 }
371
372 #[test]
373 fn test_parse_steps_markdown_wrapped() {
374 let json = r#"```json
375[{"description": "Do something", "action_type": "plan", "depends_on": []}]
376```"#;
377
378 let steps = parse_steps(json).unwrap();
379 assert_eq!(steps.len(), 1);
380 }
381
382 #[tokio::test]
383 async fn rejects_execute_step_with_empty_command() {
384 use cortex::llm::{LlmError, LlmProvider, Message, Response, ResponseChunk};
385 use futures::Stream;
386 use std::pin::Pin;
387
388 struct EmptyCmdLlm;
389 #[async_trait]
390 impl LlmProvider for EmptyCmdLlm {
391 async fn generate(&self, _messages: &[Message]) -> Result<Response, LlmError> {
392 Ok(Response::text(
393 r#"[
394 {"description": "run the script", "action_type": "execute", "command": "", "depends_on": []}
395 ]"#,
396 None,
397 ))
398 }
399 async fn generate_stream(
400 &self,
401 _messages: &[Message],
402 ) -> Result<Pin<Box<dyn Stream<Item = Result<ResponseChunk, LlmError>> + Send>>, LlmError>
403 {
404 unreachable!("mock provider: the decomposer never streams")
405 }
406 async fn health_check(&self) -> bool {
407 true
408 }
409 fn name(&self) -> &str {
410 "test"
411 }
412 fn model(&self) -> &str {
413 "test-model"
414 }
415 async fn list_models(&self) -> Result<Vec<String>, LlmError> {
416 Ok(vec!["test-model".into()])
417 }
418 }
419
420 let llm = std::sync::Arc::new(EmptyCmdLlm);
421 let decomposer = LlmDecomposer::new(llm);
422 let err = decomposer
423 .decompose("anything", DecompositionContext::default())
424 .await
425 .unwrap_err();
426 assert!(
427 matches!(err, DecompositionError::Parse(_)),
428 "expected parse-time rejection, got {err:?}"
429 );
430 }
431
432 #[tokio::test]
433 async fn rejects_execute_step_outside_sandbox_allowlist() {
434 use cortex::llm::{LlmError, LlmProvider, Message, Response, ResponseChunk};
439 use futures::Stream;
440 use std::pin::Pin;
441
442 struct ActLlm;
443 #[async_trait]
444 impl LlmProvider for ActLlm {
445 async fn generate(&self, _messages: &[Message]) -> Result<Response, LlmError> {
446 Ok(Response::text(
447 r#"[
448 {"description": "check act installed", "action_type": "execute", "command": "which act", "depends_on": []}
449 ]"#,
450 None,
451 ))
452 }
453 async fn generate_stream(
454 &self,
455 _messages: &[Message],
456 ) -> Result<Pin<Box<dyn Stream<Item = Result<ResponseChunk, LlmError>> + Send>>, LlmError>
457 {
458 unreachable!("mock provider: the decomposer never streams")
459 }
460 async fn health_check(&self) -> bool {
461 true
462 }
463 fn name(&self) -> &str {
464 "test"
465 }
466 fn model(&self) -> &str {
467 "test-model"
468 }
469 async fn list_models(&self) -> Result<Vec<String>, LlmError> {
470 Ok(vec!["test-model".into()])
471 }
472 }
473
474 let llm = std::sync::Arc::new(ActLlm);
475 let decomposer = LlmDecomposer::new(llm);
476 let ctx = DecompositionContext {
477 available_tools: vec!["ls".into(), "grep".into(), "cargo".into()],
478 ..Default::default()
479 };
480 let err = decomposer.decompose("anything", ctx).await.unwrap_err();
481 match err {
482 DecompositionError::Parse(msg) => {
483 assert!(
484 msg.contains("which") && msg.contains("not on the sandbox allowlist"),
485 "expected allowlist-rejection message, got: {msg}"
486 );
487 }
488 other => panic!("expected Parse error, got {other:?}"),
489 }
490 }
491
492 #[tokio::test]
493 async fn rejects_execute_step_with_pipeline() {
494 use cortex::llm::{LlmError, LlmProvider, Message, Response, ResponseChunk};
495 use futures::Stream;
496 use std::pin::Pin;
497
498 struct PipeLlm;
499 #[async_trait]
500 impl LlmProvider for PipeLlm {
501 async fn generate(&self, _messages: &[Message]) -> Result<Response, LlmError> {
502 Ok(Response::text(
503 r#"[
504 {"description": "pipeline step", "action_type": "execute", "command": "ls | grep foo", "depends_on": []}
505 ]"#,
506 None,
507 ))
508 }
509 async fn generate_stream(
510 &self,
511 _messages: &[Message],
512 ) -> Result<Pin<Box<dyn Stream<Item = Result<ResponseChunk, LlmError>> + Send>>, LlmError>
513 {
514 unreachable!("mock provider: the decomposer never streams")
515 }
516 async fn health_check(&self) -> bool {
517 true
518 }
519 fn name(&self) -> &str {
520 "test"
521 }
522 fn model(&self) -> &str {
523 "test-model"
524 }
525 async fn list_models(&self) -> Result<Vec<String>, LlmError> {
526 Ok(vec!["test-model".into()])
527 }
528 }
529
530 let llm = std::sync::Arc::new(PipeLlm);
531 let decomposer = LlmDecomposer::new(llm);
532 let err = decomposer
533 .decompose("anything", DecompositionContext::default())
534 .await
535 .unwrap_err();
536 assert!(
537 matches!(err, DecompositionError::Parse(_)),
538 "expected parse-time rejection of pipeline, got {err:?}"
539 );
540 }
541
542 #[tokio::test]
543 async fn test_sequential_fallback_links_dependencyless_plans() {
544 use cortex::llm::{LlmError, LlmProvider, Message, Response, ResponseChunk};
545 use futures::Stream;
546 use std::pin::Pin;
547
548 struct FlatPlanLlm;
549 #[async_trait]
550 impl LlmProvider for FlatPlanLlm {
551 async fn generate(&self, _messages: &[Message]) -> Result<Response, LlmError> {
552 Ok(Response::text(
553 r#"[
554 {"description": "scan dir", "action_type": "research", "depends_on": []},
555 {"description": "write script", "action_type": "implement", "depends_on": []},
556 {"description": "run script", "action_type": "execute", "command": "echo hi", "depends_on": []},
557 {"description": "notify user", "action_type": "notify", "depends_on": []}
558 ]"#,
559 None,
560 ))
561 }
562 async fn generate_stream(
563 &self,
564 _messages: &[Message],
565 ) -> Result<Pin<Box<dyn Stream<Item = Result<ResponseChunk, LlmError>> + Send>>, LlmError>
566 {
567 unreachable!("mock provider: the decomposer never streams")
568 }
569 async fn health_check(&self) -> bool {
570 true
571 }
572 fn name(&self) -> &str {
573 "test"
574 }
575 fn model(&self) -> &str {
576 "test-model"
577 }
578 async fn list_models(&self) -> Result<Vec<String>, LlmError> {
579 Ok(vec!["test-model".into()])
580 }
581 }
582
583 let llm = std::sync::Arc::new(FlatPlanLlm);
584 let decomposer = LlmDecomposer::new(llm);
585 let steps = decomposer
586 .decompose("do something", DecompositionContext::default())
587 .await
588 .unwrap();
589
590 assert_eq!(steps.len(), 4);
591 assert!(steps[0].depends_on.is_empty());
593 assert_eq!(steps[1].depends_on, vec![steps[0].id.clone()]);
594 assert_eq!(steps[2].depends_on, vec![steps[1].id.clone()]);
595 assert_eq!(steps[3].depends_on, vec![steps[2].id.clone()]);
596 }
597
598 struct CannedLlm(&'static str);
601 #[async_trait]
602 impl cortex::llm::LlmProvider for CannedLlm {
603 async fn generate(
604 &self,
605 _messages: &[cortex::llm::Message],
606 ) -> Result<cortex::llm::Response, cortex::llm::LlmError> {
607 Ok(cortex::llm::Response::text(self.0, None))
608 }
609 async fn generate_stream(
610 &self,
611 _messages: &[cortex::llm::Message],
612 ) -> Result<
613 std::pin::Pin<
614 Box<
615 dyn futures::Stream<
616 Item = Result<cortex::llm::ResponseChunk, cortex::llm::LlmError>,
617 > + Send,
618 >,
619 >,
620 cortex::llm::LlmError,
621 > {
622 unreachable!("mock provider: the decomposer never streams")
623 }
624 async fn health_check(&self) -> bool {
625 true
626 }
627 fn name(&self) -> &str {
628 "test"
629 }
630 fn model(&self) -> &str {
631 "test-model"
632 }
633 async fn list_models(&self) -> Result<Vec<String>, cortex::llm::LlmError> {
634 Ok(vec!["test-model".into()])
635 }
636 }
637
638 const IMPLEMENT_WITH_GHOST_AGENT: &str = r#"[
639 {"description": "do the work", "action_type": "implement", "spec": "build it", "agent": "ghost-agent", "depends_on": []}
640 ]"#;
641
642 #[tokio::test]
643 async fn rejects_implement_step_with_unregistered_agent() {
644 let llm = std::sync::Arc::new(CannedLlm(IMPLEMENT_WITH_GHOST_AGENT));
648 let decomposer = LlmDecomposer::new(llm);
649 let ctx = DecompositionContext {
650 available_agents: vec!["claude-code".into(), "qwen".into()],
651 ..Default::default()
652 };
653 let err = decomposer.decompose("anything", ctx).await.unwrap_err();
654 match err {
655 DecompositionError::Parse(msg) => {
656 assert!(
657 msg.contains("ghost-agent") && msg.contains("not registered"),
658 "expected agent-rejection message, got: {msg}"
659 );
660 assert!(
661 msg.contains("claude-code") && msg.contains("qwen"),
662 "rejection should list available agents, got: {msg}"
663 );
664 }
665 other => panic!("expected Parse error, got {other:?}"),
666 }
667 }
668
669 #[tokio::test]
670 async fn accepts_implement_step_with_registered_agent() {
671 let llm = std::sync::Arc::new(CannedLlm(
672 r#"[{"description": "do the work", "action_type": "implement", "spec": "build it", "agent": "claude-code", "depends_on": []}]"#,
673 ));
674 let decomposer = LlmDecomposer::new(llm);
675 let ctx = DecompositionContext {
676 available_agents: vec!["claude-code".into(), "qwen".into()],
677 ..Default::default()
678 };
679 let steps = decomposer.decompose("anything", ctx).await.unwrap();
680 assert_eq!(steps.len(), 1);
681 assert!(matches!(
682 &steps[0].action,
683 StepAction::Implement { agent, .. } if agent == "claude-code"
684 ));
685 }
686
687 #[tokio::test]
688 async fn skips_agent_validation_when_roster_unknown() {
689 let llm = std::sync::Arc::new(CannedLlm(IMPLEMENT_WITH_GHOST_AGENT));
692 let decomposer = LlmDecomposer::new(llm);
693 let steps = decomposer
694 .decompose("anything", DecompositionContext::default())
695 .await
696 .unwrap();
697 assert_eq!(steps.len(), 1);
698 assert!(matches!(
699 &steps[0].action,
700 StepAction::Implement { agent, .. } if agent == "ghost-agent"
701 ));
702 }
703}