collet 0.1.0

Relentless agentic coding orchestrator with zero-drop agent loops
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
//! Phase 3-5: worktree management, conflict resolution, verification, and result merging.

use tokio::sync::mpsc;

use crate::agent::r#loop::AgentEvent;
use crate::agent::subagent::SubagentResult;
use crate::agent::swarm::config::ConflictResolution;
use crate::agent::swarm::conflict;
use crate::api::Content;
use crate::api::models::{ChatRequest, Message};
use crate::api::provider::OpenAiCompatibleProvider;

use super::*;

impl SwarmCoordinator {
    /// Create an isolated git worktree for a single agent to work in.
    ///
    /// Worktrees are placed under `<agents_dir>/worktrees/<project>/<task_id>`.
    /// `agents_dir` defaults to `~/.agents` and is configurable via
    /// `[paths] agents_dir` in config.toml or `COLLET_AGENTS_DIR` env var.
    ///
    /// Returns the worktree path string if creation succeeded, or `None` if the
    /// repo is not a git repository or the `git worktree add` command fails.
    pub(super) async fn create_worktree(&self, task_id: &str) -> Option<String> {
        let project_name = std::path::Path::new(&self.working_dir)
            .file_name()
            .map(|n| n.to_string_lossy().into_owned())
            .unwrap_or_else(|| "project".to_string());

        let wt_path = self
            .config
            .collet_home
            .join("worktrees")
            .join(&project_name)
            .join(task_id);

        if let Some(parent) = wt_path.parent() {
            let _ = tokio::fs::create_dir_all(parent).await;
        }

        let output = tokio::process::Command::new("git")
            .args(["worktree", "add", "--detach", wt_path.to_str()?])
            .current_dir(&self.working_dir)
            .output()
            .await;

        match output {
            Ok(o) if o.status.success() => {
                tracing::debug!(
                    "Created worktree for task '{task_id}' at {}",
                    wt_path.display()
                );
                Some(wt_path.to_string_lossy().into_owned())
            }
            Ok(o) => {
                tracing::debug!(
                    "git worktree add failed for {task_id}: {}",
                    String::from_utf8_lossy(&o.stderr).trim()
                );
                None
            }
            Err(e) => {
                tracing::debug!("git not available, skipping worktree: {e}");
                None
            }
        }
    }

    /// Merge changes from a completed agent's worktree back into the main working dir,
    /// then remove the worktree.
    ///
    /// Copies every file that differs from HEAD in the worktree to the main tree.
    /// Returns the list of files that were merged.
    pub(super) async fn merge_and_remove_worktree(&self, worktree_path: &str) -> Vec<String> {
        // Find files changed vs HEAD in the worktree (staged + unstaged)
        let diff = tokio::process::Command::new("git")
            .args(["diff", "HEAD", "--name-only"])
            .current_dir(worktree_path)
            .output()
            .await;

        // Also check untracked files the agent may have created
        let untracked = tokio::process::Command::new("git")
            .args(["ls-files", "--others", "--exclude-standard"])
            .current_dir(worktree_path)
            .output()
            .await;

        let mut changed_files: Vec<String> = Vec::new();
        if let Ok(o) = diff {
            for line in String::from_utf8_lossy(&o.stdout).lines() {
                if !line.is_empty() {
                    changed_files.push(line.to_string());
                }
            }
        }
        if let Ok(o) = untracked {
            for line in String::from_utf8_lossy(&o.stdout).lines() {
                if !line.is_empty() {
                    changed_files.push(line.to_string());
                }
            }
        }

        // Use git to properly merge changes (handles deletions, renames, conflicts)
        for rel_path in &changed_files {
            // Copy file content from worktree to main working dir
            let src = std::path::PathBuf::from(worktree_path).join(rel_path);
            let dst = std::path::PathBuf::from(&self.working_dir).join(rel_path);

            // Check if file exists in worktree (not deleted)
            if src.exists() {
                if let Some(parent) = dst.parent() {
                    let _ = tokio::fs::create_dir_all(parent).await;
                }
                if let Err(e) = tokio::fs::copy(&src, &dst).await {
                    tracing::warn!("Failed to merge {rel_path} from worktree: {e}");
                }
            } else if dst.exists() {
                // File was deleted in worktree — remove from main
                let _ = tokio::fs::remove_file(&dst).await;
                tracing::debug!("Removed {rel_path} (deleted in agent's worktree)");
            }
        }

        tracing::debug!(
            "Merged {} file(s) from worktree {}",
            changed_files.len(),
            worktree_path
        );

        // Remove the worktree and clean up
        let remove_result = tokio::process::Command::new("git")
            .args(["worktree", "remove", worktree_path, "--force"])
            .current_dir(&self.working_dir)
            .output()
            .await;

        if let Err(e) = remove_result {
            tracing::warn!("Failed to remove worktree {}: {}", worktree_path, e);
        }

        changed_files
    }

    /// Apply conflict resolution strategy to detected conflicts.
    ///
    /// - **LastWriterWins**: keep the latest modification per file (by timestamp).
    /// - **CoordinatorResolves**: ask the coordinator LLM to pick the best version.
    /// - **UserResolves**: skip automatic resolution — user will handle it.
    pub(super) async fn resolve_conflicts(
        &self,
        conflicts: &mut [conflict::FileConflict],
        event_tx: &mpsc::UnboundedSender<AgentEvent>,
    ) {
        if conflicts.is_empty() {
            return;
        }

        // First pass: attempt auto-merge for sequential edit chains before
        // falling back to the configured resolution strategy.
        let auto_merged = conflict::try_auto_merge_conflicts(conflicts, &self.knowledge).await;
        if auto_merged > 0 {
            tracing::info!(
                auto_merged,
                remaining = conflicts.iter().filter(|c| c.resolution.is_none()).count(),
                "Auto-merged sequential conflicts"
            );
        }

        // Log a summary of all remaining unresolved conflicts for diagnostics.
        let summary = conflict::conflicts_summary(conflicts);
        if !summary.is_empty() {
            tracing::debug!(conflicts = ?summary, "Unresolved conflicts before resolution strategy");
        }

        // Only report and resolve conflicts that couldn't be auto-merged.
        let unresolved: Vec<_> = conflicts
            .iter()
            .filter(|c| c.resolution.is_none())
            .map(|c| (c.path.clone(), c.agents.clone()))
            .collect();

        if unresolved.is_empty() {
            return;
        }

        let _ = event_tx.send(AgentEvent::SwarmConflict {
            conflicts: unresolved,
        });

        // When user-resolves mode is active, the TUI may later set UserResolved on conflicts.
        // Log unresolved count so the caller knows how many remain for the user.
        match self.hive_config.conflict_resolution {
            ConflictResolution::UserResolves => {
                // No automatic resolution — user is expected to handle conflicts.
                tracing::info!(
                    "conflict_resolution=user_resolves: {} conflict(s) left for user",
                    conflicts.len()
                );
            }
            ConflictResolution::LastWriterWins => {
                let _ = event_tx.send(AgentEvent::PhaseChange {
                    label: format!(
                        "{} — resolving conflicts (last-writer-wins)...",
                        self.mode_label()
                    ),
                });

                for c in conflicts.iter_mut() {
                    let mods = self.knowledge.file_modifications(&c.path).await;
                    if let Some(winner) = mods.iter().max_by_key(|m| m.timestamp) {
                        tracing::info!("last_writer_wins: {} → agent {}", c.path, winner.agent_id);
                        c.resolution = Some(conflict::ConflictResolutionOutcome::AutoMerged);
                    }
                }
            }
            ConflictResolution::CoordinatorResolves => {
                let _ = event_tx.send(AgentEvent::PhaseChange {
                    label: format!(
                        "{} — resolving conflicts (coordinator)...",
                        self.mode_label()
                    ),
                });

                let coord_client = self.hive_config.coordinator_client(&self.client);

                for c in conflicts.iter_mut() {
                    let mods = self.knowledge.file_modifications(&c.path).await;

                    // Check if modifications can be auto-merged (sequential edits)
                    let can_auto = mods.len() == 2 && conflict::can_auto_merge(&mods[0], &mods[1]);
                    if can_auto {
                        tracing::info!("auto-merged sequential edits: {}", c.path);
                        c.resolution = Some(conflict::ConflictResolutionOutcome::AutoMerged);
                        continue;
                    }

                    // Ask coordinator LLM to decide
                    match self.coordinator_resolve_file(&coord_client, c, &mods).await {
                        Ok(explanation) => {
                            tracing::info!(path = %c.path, %explanation, "Coordinator resolved conflict");
                            c.resolution =
                                Some(conflict::ConflictResolutionOutcome::CoordinatorResolved {
                                    explanation,
                                });
                        }
                        Err(e) => {
                            tracing::warn!(
                                "coordinator failed to resolve {}: {e}; leaving unresolved",
                                c.path
                            );
                        }
                    }
                }
            }
        }

        // Log resolution outcomes for diagnostics.
        for c in conflicts.iter() {
            match &c.resolution {
                Some(conflict::ConflictResolutionOutcome::CoordinatorResolved { explanation }) => {
                    tracing::debug!(path = %c.path, explanation, "Coordinator resolution recorded");
                }
                Some(conflict::ConflictResolutionOutcome::UserResolved { choice }) => {
                    tracing::debug!(path = %c.path, choice, "User-resolved conflict recorded");
                }
                _ => {}
            }
        }
    }

    /// Read a truncated snapshot of a file for conflict resolution.
    ///
    /// Returns `Some(content)` (max ~8KB) or `None` if the file is unreadable or binary.
    pub(super) async fn read_content_snapshot(path: &str, working_dir: &str) -> Option<String> {
        let full_path = if std::path::Path::new(path).is_absolute() {
            std::path::PathBuf::from(path)
        } else {
            std::path::PathBuf::from(working_dir).join(path)
        };

        match tokio::fs::read_to_string(&full_path).await {
            Ok(content) => {
                const MAX_SNAPSHOT: usize = 8192;
                if content.len() <= MAX_SNAPSHOT {
                    Some(content)
                } else {
                    Some(content[..MAX_SNAPSHOT].to_string())
                }
            }
            Err(_) => None,
        }
    }

    /// Ask the coordinator LLM to resolve a file conflict.
    ///
    /// Uses stored content snapshots so the LLM can compare each agent's actual
    /// version, not just the last writer's copy on disk.
    pub(super) async fn coordinator_resolve_file(
        &self,
        coord_client: &OpenAiCompatibleProvider,
        file_conflict: &conflict::FileConflict,
        mods: &[crate::agent::swarm::knowledge::FileModification],
    ) -> crate::common::Result<String> {
        // Build version descriptions from stored snapshots
        let mut version_sections = Vec::new();
        for (i, m) in mods.iter().enumerate() {
            let content_preview = m
                .content_snapshot
                .as_deref()
                .map(|s| truncate(s, 3000))
                .unwrap_or_else(|| "(snapshot not available)".to_string());
            version_sections.push(format!(
                "### Version {} — Agent `{}`\n\
                 Modification: {:?}\n\
                 ```\n{}\n```",
                i + 1,
                m.agent_id,
                m.modification_type,
                content_preview,
            ));
        }

        let prompt = format!(
            "You are a code merge coordinator. Multiple agents modified the same file \
             and their changes conflict.\n\n\
             **File:** `{}`\n\n\
             {}\n\n\
             Compare the versions above. Decide which version to keep, or describe how to \
             combine them. Respond with a brief explanation (1-3 sentences) of your decision.",
            file_conflict.path,
            version_sections.join("\n\n"),
        );

        let req = ChatRequest {
            model: coord_client.model.clone(),
            messages: vec![Message {
                role: "user".to_string(),
                content: Some(Content::text(prompt)),
                reasoning_content: None,
                tool_calls: None,
                tool_call_id: None,
            }],
            tools: None,
            tool_choice: None,
            temperature: Some(0.2),
            max_tokens: 500,
            stream: false,
            thinking_budget_tokens: None,
            reasoning_effort: None,
        };

        let resp = coord_client
            .chat(&req)
            .await
            .map_err(|e| crate::common::AgentError::Transport(e.to_string()))?;
        let explanation = resp
            .choices
            .first()
            .and_then(|c| c.message.content.as_ref())
            .map(|c| c.text_content())
            .unwrap_or_else(|| "No explanation provided.".to_string());

        Ok(explanation)
    }

    /// Verification gate: run `cargo check` after merge to validate integration.
    pub(super) async fn verify_merge(
        &self,
        event_tx: &mpsc::UnboundedSender<AgentEvent>,
    ) -> Option<VerificationResult> {
        // Check if Cargo.toml exists (only run for Rust projects)
        let cargo_toml = std::path::PathBuf::from(&self.working_dir).join("Cargo.toml");
        if !cargo_toml.exists() {
            return None;
        }

        let _ = event_tx.send(AgentEvent::PhaseChange {
            label: format!(
                "{} — running verification gate (cargo check)...",
                self.mode_label()
            ),
        });

        let output = tokio::process::Command::new("cargo")
            .arg("check")
            .arg("--message-format=short")
            .current_dir(&self.working_dir)
            .output()
            .await;

        match output {
            Ok(out) => {
                let stdout = String::from_utf8_lossy(&out.stdout).to_string();
                let stderr = String::from_utf8_lossy(&out.stderr).to_string();
                let combined = format!("{stdout}\n{stderr}");
                let passed = out.status.success();

                if passed {
                    let _ = event_tx.send(AgentEvent::PhaseChange {
                        label: format!(
                            "{} — ✓ verification passed (cargo check)",
                            self.mode_label()
                        ),
                    });
                }

                Some(VerificationResult {
                    passed,
                    output: combined,
                    command: "cargo check".to_string(),
                })
            }
            Err(e) => {
                tracing::warn!("Verification gate failed to run: {e}");
                None
            }
        }
    }

    /// Merge results into a coherent summary.
    /// Failed agent results are reported but excluded from the merge.
    pub(super) fn merge_results(
        &self,
        results: &[SubagentResult],
        conflicts: &[conflict::FileConflict],
        verification: Option<&VerificationResult>,
    ) -> String {
        let mut parts = Vec::new();

        let successful: Vec<&SubagentResult> = results.iter().filter(|r| r.success).collect();
        let failed: Vec<&SubagentResult> = results.iter().filter(|r| !r.success).collect();

        parts.push(format!(
            "## {} Execution Summary\n\n{} agents completed ({} ok, {} failed).",
            self.mode_label(),
            results.len(),
            successful.len(),
            failed.len(),
        ));

        // Successful agents
        for result in &successful {
            parts.push(format!(
                "\n### Agent `{}` [OK]\n\n{}\n\nFiles modified: {}\nTool calls: {}",
                result.id,
                truncate(&result.response, 500),
                if result.modified_files.is_empty() {
                    "none".to_string()
                } else {
                    result.modified_files.join(", ")
                },
                result.tool_calls,
            ));
        }

        // Failed agents — isolated from merge
        if !failed.is_empty() {
            parts.push("\n### ⚠ Failed Agents (excluded from merge)\n".to_string());
            for result in &failed {
                parts.push(format!(
                    "- `{}`: {} (files: {}, tool calls: {})",
                    result.id,
                    truncate(&result.response, 200),
                    if result.modified_files.is_empty() {
                        "none".to_string()
                    } else {
                        result.modified_files.join(", ")
                    },
                    result.tool_calls,
                ));
            }
        }

        if !conflicts.is_empty() {
            parts.push("\n### Conflicts Detected\n".to_string());
            for conflict in conflicts {
                parts.push(format!(
                    "- `{}` modified by: {}{}",
                    conflict.path,
                    conflict.agents.join(", "),
                    conflict
                        .resolution
                        .as_ref()
                        .map(|r| format!(" (resolved: {:?})", r))
                        .unwrap_or_default()
                ));
            }
        }

        // Verification gate result
        if let Some(vr) = verification {
            parts.push(format!(
                "\n### Verification Gate ({})\n\n{}: {}",
                vr.command,
                if vr.passed {
                    "✓ PASSED"
                } else {
                    "✗ FAILED"
                },
                truncate(&vr.output, 500),
            ));
        }

        parts.join("\n")
    }
}