Skip to main content

ralph/commands/task/decompose/
mod.rs

1//! Task decomposition planning and queue materialization helpers.
2//!
3//! Responsibilities:
4//! - Resolve decomposition sources and attach targets.
5//! - Run the planner prompt and turn JSON into durable queue tasks.
6//! - Enforce child-policy and sibling-dependency semantics for preview/write flows.
7//!
8//! Not handled here:
9//! - CLI parsing or terminal formatting details.
10//! - Direct queue editing by runners.
11//!
12//! Invariants/assumptions:
13//! - Preview stays side-effect free with respect to queue/done files.
14//! - Write mode re-checks queue state under lock before mutating.
15
16mod support;
17#[cfg(test)]
18mod tests;
19mod tree;
20
21use super::resolve_task_runner_settings;
22use crate::commands::run::PhaseType;
23use crate::contracts::{Model, ProjectType, QueueFile, ReasoningEffort, Runner, Task, TaskStatus};
24use crate::{config, prompts, queue, runner, runutil, timeutil};
25use anyhow::{Context, Result, bail};
26use serde::{Deserialize, Serialize};
27use support::{
28    allocate_sequential_ids, annotate_parent, created_node_count, descendant_ids_for_parent,
29    done_queue_ref, ensure_subtree_is_replaceable, insertion_index, kind_for_source,
30    looks_like_task_id, materialize_children, materialize_node, request_context,
31};
32use tree::normalize_response;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
35#[serde(rename_all = "snake_case")]
36pub enum DecompositionChildPolicy {
37    Fail,
38    Append,
39    Replace,
40}
41
42#[derive(Debug, Clone)]
43pub struct TaskDecomposeOptions {
44    pub source_input: String,
45    pub attach_to_task_id: Option<String>,
46    pub max_depth: u8,
47    pub max_children: usize,
48    pub max_nodes: usize,
49    pub status: TaskStatus,
50    pub child_policy: DecompositionChildPolicy,
51    pub with_dependencies: bool,
52    pub runner_override: Option<Runner>,
53    pub model_override: Option<Model>,
54    pub reasoning_effort_override: Option<ReasoningEffort>,
55    pub runner_cli_overrides: crate::contracts::RunnerCliOptionsPatch,
56    pub repoprompt_tool_injection: bool,
57}
58
59#[derive(Debug, Clone, Serialize)]
60#[serde(tag = "kind", rename_all = "snake_case")]
61pub enum DecompositionSource {
62    Freeform { request: String },
63    ExistingTask { task: Box<Task> },
64}
65
66#[derive(Debug, Clone, Serialize)]
67pub struct DecompositionAttachTarget {
68    pub task: Box<Task>,
69    pub has_existing_children: bool,
70}
71
72#[derive(Debug, Clone, Serialize)]
73pub struct DecompositionPreview {
74    pub source: DecompositionSource,
75    pub attach_target: Option<DecompositionAttachTarget>,
76    pub plan: DecompositionPlan,
77    pub write_blockers: Vec<String>,
78    pub child_status: TaskStatus,
79    pub child_policy: DecompositionChildPolicy,
80    pub with_dependencies: bool,
81}
82
83#[derive(Debug, Clone, Serialize)]
84pub struct DecompositionPlan {
85    pub root: PlannedNode,
86    pub warnings: Vec<String>,
87    pub total_nodes: usize,
88    pub leaf_nodes: usize,
89    pub dependency_edges: Vec<DependencyEdgePreview>,
90}
91
92#[derive(Debug, Clone, Serialize)]
93pub struct DependencyEdgePreview {
94    pub task_title: String,
95    pub depends_on_title: String,
96}
97
98#[derive(Debug, Clone, Serialize)]
99pub struct TaskDecomposeWriteResult {
100    pub root_task_id: Option<String>,
101    pub parent_task_id: Option<String>,
102    pub created_ids: Vec<String>,
103    pub replaced_ids: Vec<String>,
104    pub parent_annotated: bool,
105}
106
107#[derive(Debug, Clone, Deserialize)]
108#[serde(deny_unknown_fields)]
109struct RawDecompositionResponse {
110    #[serde(default)]
111    warnings: Vec<String>,
112    tree: RawPlannedNode,
113}
114
115#[derive(Debug, Clone, Deserialize)]
116#[serde(deny_unknown_fields)]
117struct RawPlannedNode {
118    #[serde(default)]
119    key: Option<String>,
120    title: String,
121    #[serde(default)]
122    description: Option<String>,
123    #[serde(default)]
124    plan: Vec<String>,
125    #[serde(default)]
126    tags: Vec<String>,
127    #[serde(default)]
128    scope: Vec<String>,
129    #[serde(default)]
130    depends_on: Vec<String>,
131    #[serde(default)]
132    children: Vec<RawPlannedNode>,
133}
134
135#[derive(Debug, Clone, Serialize)]
136pub struct PlannedNode {
137    pub planner_key: String,
138    pub title: String,
139    pub description: Option<String>,
140    pub plan: Vec<String>,
141    pub tags: Vec<String>,
142    pub scope: Vec<String>,
143    pub depends_on_keys: Vec<String>,
144    pub children: Vec<PlannedNode>,
145    #[serde(skip_serializing)]
146    dependency_refs: Vec<String>,
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150enum SourceKind {
151    Freeform,
152    ExistingTask,
153}
154
155struct PlannerState {
156    remaining_nodes: usize,
157    warnings: Vec<String>,
158    with_dependencies: bool,
159}
160
161pub fn plan_task_decomposition(
162    resolved: &config::Resolved,
163    opts: &TaskDecomposeOptions,
164) -> Result<DecompositionPreview> {
165    let (active, done) = queue::load_and_validate_queues(resolved, true)?;
166    let source = resolve_source(resolved, &active, done.as_ref(), opts.source_input.trim())?;
167    let attach_target = resolve_attach_target(
168        resolved,
169        &active,
170        done.as_ref(),
171        opts.attach_to_task_id.as_deref(),
172        &source,
173    )?;
174
175    let template = prompts::load_task_decompose_prompt(&resolved.repo_root)?;
176    let prompt = build_planner_prompt(resolved, opts, &source, attach_target.as_ref(), &template)?;
177    let settings = resolve_task_runner_settings(
178        resolved,
179        opts.runner_override.clone(),
180        opts.model_override.clone(),
181        opts.reasoning_effort_override,
182        &opts.runner_cli_overrides,
183    )?;
184    let bins = runner::resolve_binaries(&resolved.config.agent);
185    let retry_policy = runutil::RunnerRetryPolicy::from_config(&resolved.config.agent.runner_retry)
186        .unwrap_or_default();
187
188    let output = runutil::run_prompt_with_handling(
189        runutil::RunnerInvocation {
190            repo_root: &resolved.repo_root,
191            runner_kind: settings.runner,
192            bins,
193            model: settings.model,
194            reasoning_effort: settings.reasoning_effort,
195            runner_cli: settings.runner_cli,
196            prompt: &prompt,
197            timeout: None,
198            permission_mode: settings.permission_mode,
199            revert_on_error: false,
200            git_revert_mode: resolved
201                .config
202                .agent
203                .git_revert_mode
204                .unwrap_or(crate::contracts::GitRevertMode::Ask),
205            output_handler: None,
206            output_stream: runner::OutputStream::Terminal,
207            revert_prompt: None,
208            phase_type: PhaseType::SinglePhase,
209            session_id: None,
210            retry_policy,
211        },
212        runutil::RunnerErrorMessages {
213            log_label: "task decompose planner",
214            interrupted_msg: "Task decomposition interrupted: the planner run was canceled.",
215            timeout_msg: "Task decomposition timed out before a plan was returned.",
216            terminated_msg: "Task decomposition terminated: the planner was stopped by a signal.",
217            non_zero_msg: |code| {
218                format!(
219                    "Task decomposition failed: the planner exited with a non-zero code ({code})."
220                )
221            },
222            other_msg: |err| {
223                format!(
224                    "Task decomposition failed: the planner could not be started or encountered an error. Error: {:#}",
225                    err
226                )
227            },
228        },
229    )?;
230
231    let planner_text = extract_planner_text(&output.stdout).context(
232        "Task decomposition planner did not produce a final assistant response containing JSON.",
233    )?;
234    let raw = parse_planner_response(&planner_text)?;
235    let default_root_title = match &source {
236        DecompositionSource::Freeform { request } => request.clone(),
237        DecompositionSource::ExistingTask { task } => task.title.clone(),
238    };
239    let plan = normalize_response(raw, kind_for_source(&source), opts, &default_root_title)?;
240    let write_blockers = compute_write_blockers(
241        &active,
242        done.as_ref(),
243        &source,
244        attach_target.as_ref(),
245        opts.child_policy,
246    )?;
247
248    Ok(DecompositionPreview {
249        source,
250        attach_target,
251        plan,
252        write_blockers,
253        child_status: opts.status,
254        child_policy: opts.child_policy,
255        with_dependencies: opts.with_dependencies,
256    })
257}
258
259pub fn write_task_decomposition(
260    resolved: &config::Resolved,
261    preview: &DecompositionPreview,
262    force: bool,
263) -> Result<TaskDecomposeWriteResult> {
264    if !preview.write_blockers.is_empty() {
265        bail!(preview.write_blockers.join("\n"));
266    }
267
268    let _queue_lock = queue::acquire_queue_lock(&resolved.repo_root, "task decompose", force)?;
269    let mut active = queue::load_queue(&resolved.queue_path)?;
270    let done = queue::load_queue_or_default(&resolved.done_path)?;
271    let done_ref = done_queue_ref(&done, &resolved.done_path);
272    let max_depth = resolved.config.queue.max_dependency_depth.unwrap_or(10);
273    queue::validate_queue_set(
274        &active,
275        done_ref,
276        &resolved.id_prefix,
277        resolved.id_width,
278        max_depth,
279    )
280    .context("validate queue set before task decompose write")?;
281
282    let effective_parent = resolve_effective_parent_for_write(&active, done_ref, preview)?;
283    let existing_descendant_ids = effective_parent
284        .as_ref()
285        .map(|task| descendant_ids_for_parent(&active, task.id.as_str()))
286        .transpose()?
287        .unwrap_or_default();
288
289    match preview.child_policy {
290        DecompositionChildPolicy::Fail => {
291            if !existing_descendant_ids.is_empty() {
292                let parent_id = effective_parent
293                    .as_ref()
294                    .map(|task| task.id.as_str())
295                    .unwrap_or("");
296                bail!(
297                    "Task {} already has child tasks. Refusing write for `ralph task decompose --child-policy fail`.",
298                    parent_id
299                );
300            }
301        }
302        DecompositionChildPolicy::Replace => {
303            if !existing_descendant_ids.is_empty() {
304                ensure_subtree_is_replaceable(&active, done_ref, &existing_descendant_ids)?;
305            }
306        }
307        DecompositionChildPolicy::Append => {}
308    }
309
310    crate::undo::create_undo_snapshot(
311        resolved,
312        &match (&preview.source, preview.attach_target.as_ref()) {
313            (DecompositionSource::Freeform { request }, None) => {
314                format!("task decompose write for request '{request}'")
315            }
316            (DecompositionSource::Freeform { request }, Some(parent)) => {
317                format!(
318                    "task decompose attach request '{}' under {}",
319                    request, parent.task.id
320                )
321            }
322            (DecompositionSource::ExistingTask { task }, None) => {
323                format!("task decompose {} into child tasks", task.id)
324            }
325            (DecompositionSource::ExistingTask { task }, Some(parent)) => {
326                format!(
327                    "task decompose {} attached under {}",
328                    task.id, parent.task.id
329                )
330            }
331        },
332    )?;
333
334    let created_count = created_node_count(preview);
335    if created_count == 0 {
336        bail!("Task decomposition produced no child tasks to write.");
337    }
338
339    let ids = allocate_sequential_ids(
340        &active,
341        done_ref,
342        &resolved.id_prefix,
343        resolved.id_width,
344        max_depth,
345        created_count,
346    )?;
347    let now = timeutil::now_utc_rfc3339()?;
348    let request_context = request_context(preview);
349    let mut next_id_index = 0usize;
350
351    let mut created_tasks = match (&preview.source, effective_parent.as_ref()) {
352        (DecompositionSource::ExistingTask { task: _ }, Some(parent))
353            if preview.attach_target.is_none() =>
354        {
355            materialize_children(
356                &preview.plan.root.children,
357                Some(parent.id.as_str()),
358                &ids,
359                &mut next_id_index,
360                preview.child_status,
361                &request_context,
362                &now,
363            )?
364        }
365        (_, Some(parent)) => {
366            let root_task = materialize_node(
367                &preview.plan.root,
368                Some(parent.id.as_str()),
369                &ids,
370                &mut next_id_index,
371                preview.child_status,
372                &request_context,
373                &now,
374            )?;
375            let root_id = root_task.id.clone();
376            let mut tasks = vec![root_task];
377            tasks.extend(materialize_children(
378                &preview.plan.root.children,
379                Some(root_id.as_str()),
380                &ids,
381                &mut next_id_index,
382                preview.child_status,
383                &request_context,
384                &now,
385            )?);
386            tasks
387        }
388        (_, None) => {
389            let root_task = materialize_node(
390                &preview.plan.root,
391                None,
392                &ids,
393                &mut next_id_index,
394                preview.child_status,
395                &request_context,
396                &now,
397            )?;
398            let root_id = root_task.id.clone();
399            let mut tasks = vec![root_task];
400            tasks.extend(materialize_children(
401                &preview.plan.root.children,
402                Some(root_id.as_str()),
403                &ids,
404                &mut next_id_index,
405                preview.child_status,
406                &request_context,
407                &now,
408            )?);
409            tasks
410        }
411    };
412
413    let root_task_id = match (&preview.source, preview.attach_target.as_ref()) {
414        (DecompositionSource::ExistingTask { .. }, None) => None,
415        _ => created_tasks.first().map(|task| task.id.clone()),
416    };
417    let parent_task_id = effective_parent.as_ref().map(|task| task.id.clone());
418    let created_ids = created_tasks
419        .iter()
420        .map(|task| task.id.clone())
421        .collect::<Vec<_>>();
422    let replaced_ids = if preview.child_policy == DecompositionChildPolicy::Replace {
423        existing_descendant_ids.iter().cloned().collect::<Vec<_>>()
424    } else {
425        Vec::new()
426    };
427
428    let removed_ids = existing_descendant_ids;
429    if !removed_ids.is_empty() && preview.child_policy == DecompositionChildPolicy::Replace {
430        active
431            .tasks
432            .retain(|task| !removed_ids.contains(task.id.as_str()));
433    }
434
435    let insert_at = insertion_index(
436        &active,
437        effective_parent.as_ref(),
438        &removed_ids,
439        preview.child_policy,
440    )?;
441
442    if let Some(parent) = effective_parent {
443        annotate_parent(
444            &mut active,
445            &parent.id,
446            &preview.source,
447            preview.attach_target.as_ref(),
448            &created_tasks,
449            &now,
450        )?;
451    }
452
453    for (offset, task) in created_tasks.drain(..).enumerate() {
454        active.tasks.insert(insert_at + offset, task);
455    }
456
457    queue::validate_queue_set(
458        &active,
459        done_ref,
460        &resolved.id_prefix,
461        resolved.id_width,
462        max_depth,
463    )
464    .context("validate queue set after task decompose write")?;
465    queue::save_queue(&resolved.queue_path, &active)?;
466
467    Ok(TaskDecomposeWriteResult {
468        root_task_id,
469        parent_task_id,
470        created_ids,
471        replaced_ids,
472        parent_annotated: preview.attach_target.is_some()
473            || matches!(preview.source, DecompositionSource::ExistingTask { .. }),
474    })
475}
476
477fn resolve_source(
478    resolved: &config::Resolved,
479    active: &QueueFile,
480    done: Option<&QueueFile>,
481    source_input: &str,
482) -> Result<DecompositionSource> {
483    if source_input.is_empty() {
484        bail!("Missing source: task decompose requires a task ID or freeform request.");
485    }
486    if looks_like_task_id(source_input, &resolved.id_prefix, resolved.id_width) {
487        let task = queue::operations::find_task_across(active, done, source_input)
488            .with_context(|| format!("Unknown task ID '{source_input}' for task decomposition."))?;
489        if done.is_some_and(|done_file| {
490            queue::operations::find_task(done_file, source_input).is_some()
491        }) {
492            bail!(
493                "Task {} is in the done archive. `ralph task decompose` only supports active tasks unless explicitly overridden.",
494                source_input
495            );
496        }
497        ensure_existing_task_is_supported(task)?;
498        return Ok(DecompositionSource::ExistingTask {
499            task: Box::new(task.clone()),
500        });
501    }
502
503    Ok(DecompositionSource::Freeform {
504        request: source_input.to_string(),
505    })
506}
507
508fn resolve_attach_target(
509    resolved: &config::Resolved,
510    active: &QueueFile,
511    done: Option<&QueueFile>,
512    attach_to: Option<&str>,
513    source: &DecompositionSource,
514) -> Result<Option<DecompositionAttachTarget>> {
515    let Some(attach_to) = attach_to.map(str::trim).filter(|value| !value.is_empty()) else {
516        return Ok(None);
517    };
518    if !looks_like_task_id(attach_to, &resolved.id_prefix, resolved.id_width) {
519        bail!(
520            "Invalid attach target '{}': expected a task ID like {}-0001.",
521            attach_to,
522            queue::normalize_prefix(&resolved.id_prefix)
523        );
524    }
525    if matches!(source, DecompositionSource::ExistingTask { .. }) {
526        bail!(
527            "`ralph task decompose --attach-to` only supports freeform request sources. Use either an existing task source or --attach-to, not both."
528        );
529    }
530    let task = queue::operations::find_task_across(active, done, attach_to)
531        .with_context(|| format!("Unknown attach target '{attach_to}' for task decomposition."))?;
532    if done.is_some_and(|done_file| queue::operations::find_task(done_file, attach_to).is_some()) {
533        bail!(
534            "Task {} is in the done archive and cannot be used as an attach target.",
535            attach_to
536        );
537    }
538    ensure_existing_task_is_supported(task)?;
539    let hierarchy = queue::hierarchy::HierarchyIndex::build(active, done);
540    Ok(Some(DecompositionAttachTarget {
541        task: Box::new(task.clone()),
542        has_existing_children: !hierarchy.children_of(&task.id).is_empty(),
543    }))
544}
545
546fn resolve_effective_parent_for_write(
547    active: &QueueFile,
548    done: Option<&QueueFile>,
549    preview: &DecompositionPreview,
550) -> Result<Option<Task>> {
551    if let Some(attach_target) = &preview.attach_target {
552        let task =
553            queue::operations::find_task(active, &attach_target.task.id).with_context(|| {
554                crate::error_messages::source_task_not_found(&attach_target.task.id, false)
555            })?;
556        ensure_existing_task_is_supported(task)?;
557        return Ok(Some(task.clone()));
558    }
559    match &preview.source {
560        DecompositionSource::Freeform { .. } => Ok(None),
561        DecompositionSource::ExistingTask { task } => {
562            let active_task = queue::operations::find_task(active, &task.id)
563                .with_context(|| crate::error_messages::source_task_not_found(&task.id, false))?;
564            if done.is_some_and(|done_file| {
565                queue::operations::find_task(done_file, &task.id).is_some()
566            }) {
567                bail!(
568                    "Task {} is in the done archive and cannot be decomposed in-place.",
569                    task.id
570                );
571            }
572            ensure_existing_task_is_supported(active_task)?;
573            Ok(Some(active_task.clone()))
574        }
575    }
576}
577
578fn ensure_existing_task_is_supported(task: &Task) -> Result<()> {
579    if matches!(task.status, TaskStatus::Done | TaskStatus::Rejected) {
580        bail!(
581            "Task {} has terminal status {} and cannot be decomposed without an explicit override.",
582            task.id,
583            task.status
584        );
585    }
586    Ok(())
587}
588
589fn compute_write_blockers(
590    active: &QueueFile,
591    done: Option<&QueueFile>,
592    source: &DecompositionSource,
593    attach_target: Option<&DecompositionAttachTarget>,
594    child_policy: DecompositionChildPolicy,
595) -> Result<Vec<String>> {
596    let mut write_blockers = Vec::new();
597    let effective_parent_id = attach_target
598        .map(|target| target.task.id.clone())
599        .or_else(|| match source {
600            DecompositionSource::ExistingTask { task } => Some(task.id.clone()),
601            DecompositionSource::Freeform { .. } => None,
602        });
603
604    if let Some(parent_id) = effective_parent_id {
605        let descendant_ids = descendant_ids_for_parent(active, parent_id.as_str())?;
606        let has_existing_children = !descendant_ids.is_empty();
607        match child_policy {
608            DecompositionChildPolicy::Fail if has_existing_children => {
609                write_blockers.push(format!(
610                    "Write blocked: task {} already has child tasks and --child-policy is set to fail.",
611                    parent_id
612                ));
613            }
614            DecompositionChildPolicy::Replace if has_existing_children => {
615                if let Err(err) = ensure_subtree_is_replaceable(active, done, &descendant_ids) {
616                    write_blockers.push(err.to_string());
617                }
618            }
619            _ => {}
620        }
621    }
622    Ok(write_blockers)
623}
624
625fn build_planner_prompt(
626    resolved: &config::Resolved,
627    opts: &TaskDecomposeOptions,
628    source: &DecompositionSource,
629    attach_target: Option<&DecompositionAttachTarget>,
630    template: &str,
631) -> Result<String> {
632    let (source_mode, source_request, source_task_json) = match source {
633        DecompositionSource::Freeform { request } => ("freeform", request.clone(), String::new()),
634        DecompositionSource::ExistingTask { task } => (
635            "existing_task",
636            task.request.clone().unwrap_or_else(|| task.title.clone()),
637            serde_json::to_string_pretty(task)
638                .context("serialize source task for decomposition")?,
639        ),
640    };
641    let attach_target_json = attach_target
642        .map(|target| {
643            serde_json::to_string_pretty(&target.task)
644                .context("serialize attach target for decomposition")
645        })
646        .transpose()?
647        .unwrap_or_default();
648    let project_type = resolved.config.project_type.unwrap_or(ProjectType::Code);
649    let mut prompt = prompts::render_task_decompose_prompt(
650        template,
651        source_mode,
652        &source_request,
653        &source_task_json,
654        &attach_target_json,
655        opts.max_depth,
656        opts.max_children,
657        opts.max_nodes,
658        opts.child_policy,
659        opts.with_dependencies,
660        project_type,
661        &resolved.config,
662    )?;
663    prompt = prompts::wrap_with_repoprompt_requirement(&prompt, opts.repoprompt_tool_injection);
664    prompts::wrap_with_instruction_files(&resolved.repo_root, &prompt, &resolved.config)
665}
666
667fn extract_planner_text(stdout: &str) -> Option<String> {
668    runner::extract_final_assistant_response(stdout).or_else(|| {
669        let trimmed = stdout.trim();
670        if trimmed.starts_with('{') && trimmed.ends_with('}') {
671            Some(trimmed.to_string())
672        } else {
673            None
674        }
675    })
676}
677
678fn parse_planner_response(raw_text: &str) -> Result<RawDecompositionResponse> {
679    let stripped = strip_code_fences(raw_text.trim());
680    serde_json::from_str::<RawDecompositionResponse>(stripped)
681        .or_else(|_| match extract_json_object(stripped) {
682            Some(candidate) => serde_json::from_str::<RawDecompositionResponse>(&candidate),
683            None => Err(serde_json::Error::io(std::io::Error::new(
684                std::io::ErrorKind::InvalidData,
685                "no JSON object found in planner response",
686            ))),
687        })
688        .context("parse task decomposition planner JSON")
689}
690
691fn strip_code_fences(raw: &str) -> &str {
692    let trimmed = raw.trim();
693    if let Some(inner) = trimmed.strip_prefix("```")
694        && let Some(end) = inner.rfind("```")
695    {
696        let body = &inner[..end];
697        if let Some(after_language) = body.find('\n') {
698            return body[after_language + 1..].trim();
699        }
700        return body.trim();
701    }
702    trimmed
703}
704
705fn extract_json_object(raw: &str) -> Option<String> {
706    let start = raw.find('{')?;
707    let end = raw.rfind('}')?;
708    (start < end).then(|| raw[start..=end].to_string())
709}