forge_core_local_deployment/
container.rs

1use std::{
2    collections::HashMap,
3    io,
4    path::{Path, PathBuf},
5    sync::{Arc, atomic::AtomicUsize},
6    time::Duration,
7};
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use command_group::AsyncGroupChild;
12use forge_core_db::{
13    DBService,
14    models::{
15        draft::{Draft, DraftType},
16        execution_process::{
17            ExecutionContext, ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus,
18        },
19        execution_run::ExecutionRun,
20        executor_session::ExecutorSession,
21        image::TaskImage,
22        merge::Merge,
23        project::Project,
24        task::{Task, TaskStatus},
25        task_attempt::TaskAttempt,
26    },
27};
28use forge_core_deployment::DeploymentError;
29use forge_core_executors::{
30    actions::{Executable, ExecutorAction},
31    approvals::{ExecutorApprovalService, NoopExecutorApprovalService},
32    executors::BaseCodingAgent,
33    logs::{
34        NormalizedEntryType,
35        utils::{
36            ConversationPatch,
37            patch::{escape_json_pointer_segment, extract_normalized_entry_from_patch},
38        },
39    },
40};
41use forge_core_services::services::{
42    analytics::AnalyticsContext,
43    approvals::{Approvals, executor_approvals::ExecutorApprovalBridge},
44    config::Config,
45    container::{ContainerError, ContainerRef, ContainerService},
46    diff_stream::{self, DiffStreamHandle},
47    git::{Commit, DiffTarget, GitService},
48    image::ImageService,
49    notification::NotificationService,
50    worktree_manager::WorktreeManager,
51};
52use forge_core_utils::{
53    log_msg::LogMsg,
54    msg_store::MsgStore,
55    text::{git_branch_id, short_uuid},
56};
57use futures::{FutureExt, StreamExt, TryStreamExt, stream::select};
58use serde_json::json;
59use tokio::{sync::RwLock, task::JoinHandle};
60use tokio_util::io::ReaderStream;
61use uuid::Uuid;
62
63use crate::command;
64
65#[derive(Clone)]
66pub struct LocalContainerService {
67    db: DBService,
68    child_store: Arc<RwLock<HashMap<Uuid, Arc<RwLock<AsyncGroupChild>>>>>,
69    msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
70    config: Arc<RwLock<Config>>,
71    git: GitService,
72    image_service: ImageService,
73    analytics: Option<AnalyticsContext>,
74    approvals: Approvals,
75}
76
77impl LocalContainerService {
78    pub fn new(
79        db: DBService,
80        msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
81        config: Arc<RwLock<Config>>,
82        git: GitService,
83        image_service: ImageService,
84        analytics: Option<AnalyticsContext>,
85        approvals: Approvals,
86    ) -> Self {
87        let child_store = Arc::new(RwLock::new(HashMap::new()));
88
89        LocalContainerService {
90            db,
91            child_store,
92            msg_stores,
93            config,
94            git,
95            image_service,
96            analytics,
97            approvals,
98        }
99    }
100
101    pub async fn get_child_from_store(&self, id: &Uuid) -> Option<Arc<RwLock<AsyncGroupChild>>> {
102        let map = self.child_store.read().await;
103        map.get(id).cloned()
104    }
105
106    pub async fn add_child_to_store(&self, id: Uuid, exec: AsyncGroupChild) {
107        let mut map = self.child_store.write().await;
108        map.insert(id, Arc::new(RwLock::new(exec)));
109    }
110
111    pub async fn remove_child_from_store(&self, id: &Uuid) {
112        let mut map = self.child_store.write().await;
113        map.remove(id);
114    }
115
116    /// A context is finalized when
117    /// - The next action is None (no follow-up actions)
118    /// - The run reason is not DevServer
119    fn should_finalize(ctx: &ExecutionContext) -> bool {
120        ctx.execution_process
121            .executor_action()
122            .unwrap()
123            .next_action
124            .is_none()
125            && (!matches!(
126                ctx.execution_process.run_reason,
127                ExecutionProcessRunReason::DevServer
128            ))
129    }
130
131    /// Finalize task execution by updating status to InReview and sending notifications
132    async fn finalize_task(db: &DBService, config: &Arc<RwLock<Config>>, ctx: &ExecutionContext) {
133        if let Err(e) = Task::update_status(&db.pool, ctx.task.id, TaskStatus::InReview).await {
134            tracing::error!("Failed to update task status to InReview: {e}");
135        }
136        let notify_cfg = config.read().await.notifications.clone();
137        NotificationService::notify_execution_halted(notify_cfg, ctx).await;
138    }
139
140    /// Defensively check for externally deleted worktrees and mark them as deleted in the database
141    async fn check_externally_deleted_worktrees(db: &DBService) -> Result<(), DeploymentError> {
142        let active_attempts = TaskAttempt::find_by_worktree_deleted(&db.pool).await?;
143        tracing::debug!(
144            "Checking {} active worktrees for external deletion...",
145            active_attempts.len()
146        );
147        for (attempt_id, worktree_path) in active_attempts {
148            // Check if worktree directory exists
149            if !std::path::Path::new(&worktree_path).exists() {
150                // Worktree was deleted externally, mark as deleted in database
151                if let Err(e) = TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await {
152                    tracing::error!(
153                        "Failed to mark externally deleted worktree as deleted for attempt {}: {}",
154                        attempt_id,
155                        e
156                    );
157                } else {
158                    tracing::info!(
159                        "Marked externally deleted worktree as deleted for attempt {} (path: {})",
160                        attempt_id,
161                        worktree_path
162                    );
163                }
164            }
165        }
166        Ok(())
167    }
168
169    /// Find and delete orphaned worktrees that don't correspond to any task attempts
170    async fn cleanup_orphaned_worktrees(&self) {
171        // Check if orphan cleanup is disabled via environment variable
172        if std::env::var("DISABLE_WORKTREE_ORPHAN_CLEANUP").is_ok() {
173            tracing::debug!(
174                "Orphan worktree cleanup is disabled via DISABLE_WORKTREE_ORPHAN_CLEANUP environment variable"
175            );
176            return;
177        }
178        let worktree_base_dir = WorktreeManager::get_worktree_base_dir();
179        if !worktree_base_dir.exists() {
180            tracing::debug!(
181                "Worktree base directory {} does not exist, skipping orphan cleanup",
182                worktree_base_dir.display()
183            );
184            return;
185        }
186        let entries = match std::fs::read_dir(&worktree_base_dir) {
187            Ok(entries) => entries,
188            Err(e) => {
189                tracing::error!(
190                    "Failed to read worktree base directory {}: {}",
191                    worktree_base_dir.display(),
192                    e
193                );
194                return;
195            }
196        };
197        for entry in entries {
198            let entry = match entry {
199                Ok(entry) => entry,
200                Err(e) => {
201                    tracing::warn!("Failed to read directory entry: {}", e);
202                    continue;
203                }
204            };
205            let path = entry.path();
206            // Only process directories
207            if !path.is_dir() {
208                continue;
209            }
210
211            let worktree_path_str = path.to_string_lossy().to_string();
212            // Check if worktree is referenced by either TaskAttempt OR ExecutionRun
213            let task_exists =
214                TaskAttempt::container_ref_exists(&self.db().pool, &worktree_path_str)
215                    .await
216                    .unwrap_or(true);
217            let run_exists =
218                ExecutionRun::container_ref_exists(&self.db().pool, &worktree_path_str)
219                    .await
220                    .unwrap_or(true);
221
222            if !task_exists && !run_exists {
223                // This is an orphaned worktree - delete it
224                tracing::info!("Found orphaned worktree: {}", worktree_path_str);
225                if let Err(e) = WorktreeManager::cleanup_worktree(&path, None).await {
226                    tracing::error!(
227                        "Failed to remove orphaned worktree {}: {}",
228                        worktree_path_str,
229                        e
230                    );
231                } else {
232                    tracing::info!(
233                        "Successfully removed orphaned worktree: {}",
234                        worktree_path_str
235                    );
236                }
237            }
238        }
239    }
240
241    pub async fn cleanup_expired_attempt(
242        db: &DBService,
243        attempt_id: Uuid,
244        worktree_path: PathBuf,
245        git_repo_path: PathBuf,
246    ) -> Result<(), DeploymentError> {
247        WorktreeManager::cleanup_worktree(&worktree_path, Some(&git_repo_path)).await?;
248        // Mark worktree as deleted in database after successful cleanup
249        TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await?;
250        tracing::info!("Successfully marked worktree as deleted for attempt {attempt_id}",);
251        Ok(())
252    }
253
254    pub async fn cleanup_expired_attempts(db: &DBService) -> Result<(), DeploymentError> {
255        let expired_attempts = TaskAttempt::find_expired_for_cleanup(&db.pool).await?;
256        if expired_attempts.is_empty() {
257            tracing::debug!("No expired worktrees found");
258            return Ok(());
259        }
260        tracing::info!(
261            "Found {} expired worktrees to clean up",
262            expired_attempts.len()
263        );
264        for (attempt_id, worktree_path, git_repo_path) in expired_attempts {
265            Self::cleanup_expired_attempt(
266                db,
267                attempt_id,
268                PathBuf::from(worktree_path),
269                PathBuf::from(git_repo_path),
270            )
271            .await
272            .unwrap_or_else(|e| {
273                tracing::error!("Failed to clean up expired attempt {attempt_id}: {e}",);
274            });
275        }
276        Ok(())
277    }
278
279    pub async fn spawn_worktree_cleanup(&self) {
280        let db = self.db.clone();
281        let mut cleanup_interval = tokio::time::interval(tokio::time::Duration::from_secs(1800)); // 30 minutes
282        self.cleanup_orphaned_worktrees().await;
283        tokio::spawn(async move {
284            loop {
285                cleanup_interval.tick().await;
286                tracing::info!("Starting periodic worktree cleanup...");
287                Self::check_externally_deleted_worktrees(&db)
288                    .await
289                    .unwrap_or_else(|e| {
290                        tracing::error!("Failed to check externally deleted worktrees: {}", e);
291                    });
292                Self::cleanup_expired_attempts(&db)
293                    .await
294                    .unwrap_or_else(|e| {
295                        tracing::error!("Failed to clean up expired worktree attempts: {}", e)
296                    });
297            }
298        });
299    }
300
301    /// Spawn a background task that polls the child process for completion and
302    /// cleans up the execution entry when it exits.
303    pub fn spawn_exit_monitor(
304        &self,
305        exec_id: &Uuid,
306        exit_signal: Option<tokio::sync::oneshot::Receiver<()>>,
307    ) -> JoinHandle<()> {
308        let exec_id = *exec_id;
309        let child_store = self.child_store.clone();
310        let msg_stores = self.msg_stores.clone();
311        let db = self.db.clone();
312        let config = self.config.clone();
313        let container = self.clone();
314        let analytics = self.analytics.clone();
315
316        let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
317
318        tokio::spawn(async move {
319            let mut exit_signal_future = exit_signal
320                .map(|rx| rx.map(|_| ()).boxed()) // wait for signal
321                .unwrap_or_else(|| std::future::pending::<()>().boxed()); // no signal, stall forever
322
323            let status_result: std::io::Result<std::process::ExitStatus>;
324
325            // Wait for process to exit, or exit signal from executor
326            tokio::select! {
327                // Exit signal.
328                // Some coding agent processes do not automatically exit after processing the user request; instead the executor
329                // signals when processing has finished to gracefully kill the process.
330                _ = &mut exit_signal_future => {
331                    // Executor signaled completion: kill group and remember to force Completed(0)
332                    if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
333                        let mut child = child_lock.write().await ;
334                        if let Err(err) = command::kill_process_group(&mut child).await {
335                            tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
336                        }
337                    }
338                    status_result = Ok(success_exit_status());
339                }
340                // Process exit
341                exit_status_result = &mut process_exit_rx => {
342                    status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
343                }
344            }
345
346            let (exit_code, status) = match status_result {
347                Ok(exit_status) => {
348                    let code = exit_status.code().unwrap_or(-1) as i64;
349                    let status = if exit_status.success() {
350                        ExecutionProcessStatus::Completed
351                    } else {
352                        ExecutionProcessStatus::Failed
353                    };
354                    (Some(code), status)
355                }
356                Err(_) => (None, ExecutionProcessStatus::Failed),
357            };
358
359            if !ExecutionProcess::was_stopped(&db.pool, exec_id).await
360                && let Err(e) = ExecutionProcess::update_completion(
361                    &db.pool,
362                    exec_id,
363                    status.clone(),
364                    exit_code,
365                )
366                .await
367            {
368                tracing::error!("Failed to update execution process completion: {}", e);
369            }
370
371            // Track task_attempt_failed event for non-zero exit codes (async, non-blocking)
372            if matches!(status, ExecutionProcessStatus::Failed)
373                && let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await
374                && config.read().await.analytics_enabled == Some(true)
375                && matches!(
376                    &ctx.execution_process.run_reason,
377                    ExecutionProcessRunReason::CodingAgent
378                )
379                && let Some(analytics) = &analytics
380            {
381                analytics.analytics_service.track_event(
382                    &analytics.user_id,
383                    "task_attempt_failed",
384                    Some(json!({
385                        "task_id": ctx.task.id.to_string(),
386                        "project_id": ctx.task.project_id.to_string(),
387                        "attempt_id": ctx.task_attempt.id.to_string(),
388                        "exit_code": exit_code,
389                        "executor": ctx.execution_process.executor_action().ok()
390                            .and_then(|action| match &action.typ {
391                                forge_core_executors::actions::ExecutorActionType::CodingAgentInitialRequest(req) =>
392                                    Some(req.executor_profile_id.executor),
393                                forge_core_executors::actions::ExecutorActionType::CodingAgentFollowUpRequest(req) =>
394                                    Some(req.executor_profile_id.executor),
395                                _ => None,
396                            })
397                            .map(|e| e.to_string())
398                            .unwrap_or_else(|| "unknown".to_string()),
399                    })),
400                );
401            }
402
403            if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
404                // Update executor session summary if available
405                if let Err(e) = container.update_executor_session_summary(&exec_id).await {
406                    tracing::warn!("Failed to update executor session summary: {}", e);
407                }
408
409                let success = matches!(
410                    ctx.execution_process.status,
411                    ExecutionProcessStatus::Completed
412                ) && exit_code == Some(0);
413
414                let cleanup_done = matches!(
415                    ctx.execution_process.run_reason,
416                    ExecutionProcessRunReason::CleanupScript
417                ) && !matches!(
418                    ctx.execution_process.status,
419                    ExecutionProcessStatus::Running
420                );
421
422                if success || cleanup_done {
423                    // Commit changes (if any) and get feedback about whether changes were made
424                    let changes_committed = match container.try_commit_changes(&ctx).await {
425                        Ok(committed) => committed,
426                        Err(e) => {
427                            tracing::error!("Failed to commit changes after execution: {}", e);
428                            // Treat commit failures as if changes were made to be safe
429                            true
430                        }
431                    };
432
433                    let should_start_next = if matches!(
434                        ctx.execution_process.run_reason,
435                        ExecutionProcessRunReason::CodingAgent
436                    ) {
437                        changes_committed
438                    } else {
439                        true
440                    };
441
442                    if should_start_next {
443                        // If the process exited successfully, start the next action
444                        if let Err(e) = container.try_start_next_action(&ctx).await {
445                            tracing::error!("Failed to start next action after completion: {}", e);
446                        }
447                    } else {
448                        tracing::info!(
449                            "Skipping cleanup script for task attempt {} - no changes made by coding agent",
450                            ctx.task_attempt.id
451                        );
452
453                        // Manually finalize task since we're bypassing normal execution flow
454                        Self::finalize_task(&db, &config, &ctx).await;
455                    }
456                }
457
458                if Self::should_finalize(&ctx) {
459                    Self::finalize_task(&db, &config, &ctx).await;
460                    // After finalization, check if a queued follow-up exists and start it
461                    if let Err(e) = container.try_consume_queued_followup(&ctx).await {
462                        tracing::error!(
463                            "Failed to start queued follow-up for attempt {}: {}",
464                            ctx.task_attempt.id,
465                            e
466                        );
467                    }
468                }
469
470                // Fire analytics event when CodingAgent execution has finished
471                if config.read().await.analytics_enabled == Some(true)
472                    && matches!(
473                        &ctx.execution_process.run_reason,
474                        ExecutionProcessRunReason::CodingAgent
475                    )
476                    && let Some(analytics) = &analytics
477                {
478                    analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({
479                        "task_id": ctx.task.id.to_string(),
480                        "project_id": ctx.task.project_id.to_string(),
481                        "attempt_id": ctx.task_attempt.id.to_string(),
482                        "execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed),
483                        "exit_code": ctx.execution_process.exit_code,
484                    })));
485                }
486            }
487
488            // Now that commit/next-action/finalization steps for this process are complete,
489            // capture the HEAD OID as the definitive "after" state (best-effort).
490            if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
491                let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt);
492                if let Ok(head) = container.git().get_head_info(&worktree_dir)
493                    && let Err(e) =
494                        ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid)
495                            .await
496                {
497                    tracing::warn!("Failed to update after_head_commit for {}: {}", exec_id, e);
498                }
499            }
500
501            // Cleanup msg store
502            if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) {
503                msg_arc.push_finished();
504                tokio::time::sleep(Duration::from_millis(50)).await; // Wait for the finish message to propogate
505                match Arc::try_unwrap(msg_arc) {
506                    Ok(inner) => drop(inner),
507                    Err(arc) => tracing::error!(
508                        "There are still {} strong Arcs to MsgStore for {}",
509                        Arc::strong_count(&arc),
510                        exec_id
511                    ),
512                }
513            }
514
515            // Cleanup child handle
516            child_store.write().await.remove(&exec_id);
517        })
518    }
519
520    pub fn spawn_os_exit_watcher(
521        &self,
522        exec_id: Uuid,
523    ) -> tokio::sync::oneshot::Receiver<std::io::Result<std::process::ExitStatus>> {
524        let (tx, rx) = tokio::sync::oneshot::channel::<std::io::Result<std::process::ExitStatus>>();
525        let child_store = self.child_store.clone();
526        tokio::spawn(async move {
527            loop {
528                let child_lock = {
529                    let map = child_store.read().await;
530                    map.get(&exec_id).cloned()
531                };
532                if let Some(child_lock) = child_lock {
533                    let mut child_handler = child_lock.write().await;
534                    match child_handler.try_wait() {
535                        Ok(Some(status)) => {
536                            let _ = tx.send(Ok(status));
537                            break;
538                        }
539                        Ok(None) => {}
540                        Err(e) => {
541                            let _ = tx.send(Err(e));
542                            break;
543                        }
544                    }
545                } else {
546                    let _ = tx.send(Err(io::Error::other(format!(
547                        "Child handle missing for {exec_id}"
548                    ))));
549                    break;
550                }
551                tokio::time::sleep(Duration::from_millis(250)).await;
552            }
553        });
554        rx
555    }
556
557    pub fn dir_name_from_task_attempt(attempt_id: &Uuid, task_title: &str) -> String {
558        let task_title_id = git_branch_id(task_title);
559        format!("{}-{}", short_uuid(attempt_id), task_title_id)
560    }
561
562    async fn track_child_msgs_in_store(&self, id: Uuid, child: &mut AsyncGroupChild) {
563        let store = Arc::new(MsgStore::new());
564
565        let out = child.inner().stdout.take().expect("no stdout");
566        let err = child.inner().stderr.take().expect("no stderr");
567
568        // Map stdout bytes -> LogMsg::Stdout
569        let out = ReaderStream::new(out)
570            .map_ok(|chunk| LogMsg::Stdout(String::from_utf8_lossy(&chunk).into_owned()));
571
572        // Map stderr bytes -> LogMsg::Stderr
573        let err = ReaderStream::new(err)
574            .map_ok(|chunk| LogMsg::Stderr(String::from_utf8_lossy(&chunk).into_owned()));
575
576        // If you have a JSON Patch source, map it to LogMsg::JsonPatch too, then select all three.
577
578        // Merge and forward into the store
579        let merged = select(out, err); // Stream<Item = Result<LogMsg, io::Error>>
580        let debounced = forge_core_utils::stream_ext::debounce_logs(merged);
581        store.clone().spawn_forwarder(debounced);
582
583        let mut map = self.msg_stores().write().await;
584        map.insert(id, store);
585    }
586
587    /// Get the worktree path for a task attempt
588    #[allow(dead_code)]
589    async fn get_worktree_path(
590        &self,
591        task_attempt: &TaskAttempt,
592    ) -> Result<PathBuf, ContainerError> {
593        let container_ref = self.ensure_container_exists(task_attempt).await?;
594        let worktree_dir = PathBuf::from(&container_ref);
595
596        if !worktree_dir.exists() {
597            return Err(ContainerError::Other(anyhow!(
598                "Worktree directory not found"
599            )));
600        }
601
602        Ok(worktree_dir)
603    }
604    /// Get the project repository path for a task attempt
605    async fn get_project_repo_path(
606        &self,
607        task_attempt: &TaskAttempt,
608    ) -> Result<PathBuf, ContainerError> {
609        let project_repo_path = task_attempt
610            .parent_task(&self.db().pool)
611            .await?
612            .ok_or(ContainerError::Other(anyhow!("Parent task not found")))?
613            .parent_project(&self.db().pool)
614            .await?
615            .ok_or(ContainerError::Other(anyhow!("Parent project not found")))?
616            .git_repo_path;
617
618        Ok(project_repo_path)
619    }
620
621    /// Create a diff log stream for merged attempts (never changes) for WebSocket
622    fn create_merged_diff_stream(
623        &self,
624        project_repo_path: &Path,
625        merge_commit_id: &str,
626        stats_only: bool,
627    ) -> Result<DiffStreamHandle, ContainerError> {
628        let diffs = self.git().get_diffs(
629            DiffTarget::Commit {
630                repo_path: project_repo_path,
631                commit_sha: merge_commit_id,
632            },
633            None,
634        )?;
635
636        let cum = Arc::new(AtomicUsize::new(0));
637        let diffs: Vec<_> = diffs
638            .into_iter()
639            .map(|mut d| {
640                diff_stream::apply_stream_omit_policy(&mut d, &cum, stats_only);
641                d
642            })
643            .collect();
644
645        let stream = futures::stream::iter(diffs.into_iter().map(|diff| {
646            let entry_index = GitService::diff_path(&diff);
647            let patch =
648                ConversationPatch::add_diff(escape_json_pointer_segment(&entry_index), diff);
649            Ok::<_, std::io::Error>(LogMsg::JsonPatch(patch))
650        }))
651        .chain(futures::stream::once(async {
652            Ok::<_, std::io::Error>(LogMsg::Finished)
653        }))
654        .boxed();
655
656        Ok(diff_stream::DiffStreamHandle::new(stream, None))
657    }
658
659    /// Create a live diff log stream for ongoing attempts for WebSocket
660    /// Returns a stream that owns the filesystem watcher - when dropped, watcher is cleaned up
661    async fn create_live_diff_stream(
662        &self,
663        worktree_path: &Path,
664        base_commit: &Commit,
665        stats_only: bool,
666    ) -> Result<DiffStreamHandle, ContainerError> {
667        diff_stream::create(
668            self.git().clone(),
669            worktree_path.to_path_buf(),
670            base_commit.clone(),
671            stats_only,
672        )
673        .await
674        .map_err(|e| ContainerError::Other(anyhow!("{e}")))
675    }
676}
677
678fn success_exit_status() -> std::process::ExitStatus {
679    #[cfg(unix)]
680    {
681        use std::os::unix::process::ExitStatusExt;
682        ExitStatusExt::from_raw(0)
683    }
684    #[cfg(windows)]
685    {
686        use std::os::windows::process::ExitStatusExt;
687        ExitStatusExt::from_raw(0)
688    }
689}
690
691#[async_trait]
692impl ContainerService for LocalContainerService {
693    fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>> {
694        &self.msg_stores
695    }
696
697    fn db(&self) -> &DBService {
698        &self.db
699    }
700
701    fn git(&self) -> &GitService {
702        &self.git
703    }
704
705    async fn git_branch_prefix(&self) -> String {
706        self.config.read().await.git_branch_prefix.clone()
707    }
708
709    fn task_attempt_to_current_dir(&self, task_attempt: &TaskAttempt) -> PathBuf {
710        PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default())
711    }
712    /// Create a container
713    async fn create(&self, task_attempt: &TaskAttempt) -> Result<ContainerRef, ContainerError> {
714        let task = task_attempt
715            .parent_task(&self.db.pool)
716            .await?
717            .ok_or(sqlx::Error::RowNotFound)?;
718
719        let worktree_dir_name =
720            LocalContainerService::dir_name_from_task_attempt(&task_attempt.id, &task.title);
721        let worktree_path = WorktreeManager::get_worktree_base_dir().join(&worktree_dir_name);
722
723        let project = task
724            .parent_project(&self.db.pool)
725            .await?
726            .ok_or(sqlx::Error::RowNotFound)?;
727
728        // Query forge_task_attempt_config to check if we should use worktree
729        // Default to true for backward compatibility if row doesn't exist
730        let use_worktree = sqlx::query_scalar::<_, bool>(
731            "SELECT COALESCE((SELECT use_worktree FROM forge_task_attempt_config WHERE task_attempt_id = ?), 1)"
732        )
733        .bind(task_attempt.id)
734        .fetch_one(&self.db.pool)
735        .await
736        .unwrap_or(true); // Default to true if query fails
737
738        let container_ref_path = if use_worktree {
739            // Create worktree for isolated work
740            WorktreeManager::create_worktree(
741                &project.git_repo_path,
742                &task_attempt.branch,
743                &worktree_path,
744                &task_attempt.target_branch,
745                true, // create new branch
746            )
747            .await?;
748
749            // Copy files specified in the project's copy_files field
750            if let Some(copy_files) = &project.copy_files
751                && !copy_files.trim().is_empty()
752            {
753                self.copy_project_files(&project.git_repo_path, &worktree_path, copy_files)
754                    .await
755                    .unwrap_or_else(|e| {
756                        tracing::warn!("Failed to copy project files: {}", e);
757                    });
758            }
759
760            // Copy task images from cache to worktree
761            if let Err(e) = self
762                .image_service
763                .copy_images_by_task_to_worktree(&worktree_path, task.id)
764                .await
765            {
766                tracing::warn!("Failed to copy task images to worktree: {}", e);
767            }
768
769            worktree_path
770        } else {
771            // No worktree - use project's main repository directly
772            PathBuf::from(&project.git_repo_path)
773        };
774
775        // Update both container_ref and branch in the database
776        TaskAttempt::update_container_ref(
777            &self.db.pool,
778            task_attempt.id,
779            &container_ref_path.to_string_lossy(),
780        )
781        .await?;
782
783        Ok(container_ref_path.to_string_lossy().to_string())
784    }
785
786    async fn delete_inner(&self, task_attempt: &TaskAttempt) -> Result<(), ContainerError> {
787        // cleanup the container, here that means deleting the worktree
788        let task = task_attempt
789            .parent_task(&self.db.pool)
790            .await?
791            .ok_or(sqlx::Error::RowNotFound)?;
792        let git_repo_path = match Project::find_by_id(&self.db.pool, task.project_id).await {
793            Ok(Some(project)) => Some(project.git_repo_path.clone()),
794            Ok(None) => None,
795            Err(e) => {
796                tracing::error!("Failed to fetch project {}: {}", task.project_id, e);
797                None
798            }
799        };
800        WorktreeManager::cleanup_worktree(
801            &PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()),
802            git_repo_path.as_deref(),
803        )
804        .await
805        .unwrap_or_else(|e| {
806            tracing::warn!(
807                "Failed to clean up worktree for task attempt {}: {}",
808                task_attempt.id,
809                e
810            );
811        });
812        Ok(())
813    }
814
815    async fn ensure_container_exists(
816        &self,
817        task_attempt: &TaskAttempt,
818    ) -> Result<ContainerRef, ContainerError> {
819        // Get required context
820        let task = task_attempt
821            .parent_task(&self.db.pool)
822            .await?
823            .ok_or(sqlx::Error::RowNotFound)?;
824
825        let project = task
826            .parent_project(&self.db.pool)
827            .await?
828            .ok_or(sqlx::Error::RowNotFound)?;
829
830        let container_ref = task_attempt.container_ref.as_ref().ok_or_else(|| {
831            ContainerError::Other(anyhow!("Container ref not found for task attempt"))
832        })?;
833        let worktree_path = PathBuf::from(container_ref);
834
835        // Check if this task uses worktrees
836        // Default to true for backward compatibility if row doesn't exist
837        let use_worktree = sqlx::query_scalar::<_, bool>(
838            "SELECT COALESCE((SELECT use_worktree FROM forge_task_attempt_config WHERE task_attempt_id = ?), 1)"
839        )
840        .bind(task_attempt.id)
841        .fetch_one(&self.db.pool)
842        .await
843        .unwrap_or(true); // Default to true if query fails
844
845        // Only ensure worktree exists if actually using worktrees
846        if use_worktree {
847            WorktreeManager::ensure_worktree_exists(
848                &project.git_repo_path,
849                &task_attempt.branch,
850                &worktree_path,
851            )
852            .await?;
853        }
854
855        Ok(container_ref.to_string())
856    }
857
858    async fn is_container_clean(&self, task_attempt: &TaskAttempt) -> Result<bool, ContainerError> {
859        if let Some(container_ref) = &task_attempt.container_ref {
860            // If container_ref is set, check if the worktree exists
861            let path = PathBuf::from(container_ref);
862            if path.exists() {
863                self.git().is_worktree_clean(&path).map_err(|e| e.into())
864            } else {
865                return Ok(true); // No worktree means it's clean
866            }
867        } else {
868            return Ok(true); // No container_ref means no worktree, so it's clean
869        }
870    }
871
872    async fn start_execution_inner(
873        &self,
874        task_attempt: &TaskAttempt,
875        execution_process: &ExecutionProcess,
876        executor_action: &ExecutorAction,
877    ) -> Result<(), ContainerError> {
878        // Get the worktree path
879        let container_ref = task_attempt
880            .container_ref
881            .as_ref()
882            .ok_or(ContainerError::Other(anyhow!(
883                "Container ref not found for task attempt"
884            )))?;
885        let current_dir = PathBuf::from(container_ref);
886
887        let approvals_service: Arc<dyn ExecutorApprovalService> =
888            match executor_action.base_executor() {
889                Some(BaseCodingAgent::Codex) | Some(BaseCodingAgent::ClaudeCode) => {
890                    ExecutorApprovalBridge::new(
891                        self.approvals.clone(),
892                        self.db.clone(),
893                        execution_process.id,
894                    )
895                }
896                _ => Arc::new(NoopExecutorApprovalService {}),
897            };
898
899        // Create the child and stream, add to execution tracker
900        let mut spawned = executor_action
901            .spawn(&current_dir, approvals_service)
902            .await?;
903
904        self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
905            .await;
906
907        self.add_child_to_store(execution_process.id, spawned.child)
908            .await;
909
910        // Spawn unified exit monitor: watches OS exit and optional executor signal
911        let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal);
912
913        Ok(())
914    }
915
916    async fn stop_execution(
917        &self,
918        execution_process: &ExecutionProcess,
919        status: ExecutionProcessStatus,
920    ) -> Result<(), ContainerError> {
921        let child = self
922            .get_child_from_store(&execution_process.id)
923            .await
924            .ok_or_else(|| {
925                ContainerError::Other(anyhow!("Child process not found for execution"))
926            })?;
927        let exit_code = if status == ExecutionProcessStatus::Completed {
928            Some(0)
929        } else {
930            None
931        };
932
933        ExecutionProcess::update_completion(&self.db.pool, execution_process.id, status, exit_code)
934            .await?;
935
936        // Kill the child process and remove from the store
937        {
938            let mut child_guard = child.write().await;
939            if let Err(e) = command::kill_process_group(&mut child_guard).await {
940                tracing::error!(
941                    "Failed to stop execution process {}: {}",
942                    execution_process.id,
943                    e
944                );
945                return Err(e);
946            }
947        }
948        self.remove_child_from_store(&execution_process.id).await;
949
950        // Mark the process finished in the MsgStore
951        if let Some(msg) = self.msg_stores.write().await.remove(&execution_process.id) {
952            msg.push_finished();
953        }
954
955        // Update task status to InReview when execution is stopped
956        if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await
957            && !matches!(
958                ctx.execution_process.run_reason,
959                ExecutionProcessRunReason::DevServer
960            )
961            && let Err(e) =
962                Task::update_status(&self.db.pool, ctx.task.id, TaskStatus::InReview).await
963        {
964            tracing::error!("Failed to update task status to InReview: {e}");
965        }
966
967        tracing::debug!(
968            "Execution process {} stopped successfully",
969            execution_process.id
970        );
971
972        // Record after-head commit OID (best-effort)
973        if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await {
974            let worktree = self.task_attempt_to_current_dir(&ctx.task_attempt);
975            if let Ok(head) = self.git().get_head_info(&worktree) {
976                let _ = ExecutionProcess::update_after_head_commit(
977                    &self.db.pool,
978                    execution_process.id,
979                    &head.oid,
980                )
981                .await;
982            }
983        }
984
985        Ok(())
986    }
987
988    async fn stream_diff(
989        &self,
990        task_attempt: &TaskAttempt,
991        stats_only: bool,
992    ) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
993    {
994        let project_repo_path = self.get_project_repo_path(task_attempt).await?;
995        let latest_merge =
996            Merge::find_latest_by_task_attempt_id(&self.db.pool, task_attempt.id).await?;
997
998        let is_ahead = if let Ok((ahead, _)) = self.git().get_branch_status(
999            &project_repo_path,
1000            &task_attempt.branch,
1001            &task_attempt.target_branch,
1002        ) {
1003            ahead > 0
1004        } else {
1005            false
1006        };
1007
1008        if let Some(merge) = &latest_merge
1009            && let Some(commit) = merge.merge_commit()
1010            && self.is_container_clean(task_attempt).await?
1011            && !is_ahead
1012        {
1013            // Try to use merged diff stream, but fall back to live diff if the commit doesn't exist
1014            match self.create_merged_diff_stream(&project_repo_path, &commit, stats_only) {
1015                Ok(wrapper) => return Ok(Box::pin(wrapper)),
1016                Err(e) => {
1017                    tracing::warn!(
1018                        "Failed to create merged diff stream for commit {}: {}. Falling back to live diff.",
1019                        commit,
1020                        e
1021                    );
1022                    // Fall through to live diff stream below
1023                }
1024            }
1025        }
1026
1027        let container_ref = self.ensure_container_exists(task_attempt).await?;
1028        let worktree_path = PathBuf::from(container_ref);
1029        let base_commit = self.git().get_base_commit(
1030            &project_repo_path,
1031            &task_attempt.branch,
1032            &task_attempt.target_branch,
1033        )?;
1034
1035        let wrapper = self
1036            .create_live_diff_stream(&worktree_path, &base_commit, stats_only)
1037            .await?;
1038        Ok(Box::pin(wrapper))
1039    }
1040
1041    async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result<bool, ContainerError> {
1042        if !matches!(
1043            ctx.execution_process.run_reason,
1044            ExecutionProcessRunReason::CodingAgent | ExecutionProcessRunReason::CleanupScript,
1045        ) {
1046            return Ok(false);
1047        }
1048
1049        let message = match ctx.execution_process.run_reason {
1050            ExecutionProcessRunReason::CodingAgent => {
1051                // Try to retrieve the task summary from the executor session
1052                // otherwise fallback to default message
1053                match ExecutorSession::find_by_execution_process_id(
1054                    &self.db().pool,
1055                    ctx.execution_process.id,
1056                )
1057                .await
1058                {
1059                    Ok(Some(session)) if session.summary.is_some() => session.summary.unwrap(),
1060                    Ok(_) => {
1061                        tracing::debug!(
1062                            "No summary found for execution process {}, using default message",
1063                            ctx.execution_process.id
1064                        );
1065                        format!(
1066                            "Commit changes from coding agent for task attempt {}",
1067                            ctx.task_attempt.id
1068                        )
1069                    }
1070                    Err(e) => {
1071                        tracing::debug!(
1072                            "Failed to retrieve summary for execution process {}: {}",
1073                            ctx.execution_process.id,
1074                            e
1075                        );
1076                        format!(
1077                            "Commit changes from coding agent for task attempt {}",
1078                            ctx.task_attempt.id
1079                        )
1080                    }
1081                }
1082            }
1083            ExecutionProcessRunReason::CleanupScript => {
1084                format!(
1085                    "Cleanup script changes for task attempt {}",
1086                    ctx.task_attempt.id
1087                )
1088            }
1089            _ => Err(ContainerError::Other(anyhow::anyhow!(
1090                "Invalid run reason for commit"
1091            )))?,
1092        };
1093
1094        let container_ref = ctx.task_attempt.container_ref.as_ref().ok_or_else(|| {
1095            ContainerError::Other(anyhow::anyhow!("Container reference not found"))
1096        })?;
1097
1098        tracing::debug!(
1099            "Committing changes for task attempt {} at path {:?}: '{}'",
1100            ctx.task_attempt.id,
1101            &container_ref,
1102            message
1103        );
1104
1105        let changes_committed = self.git().commit(Path::new(container_ref), &message)?;
1106        Ok(changes_committed)
1107    }
1108
1109    /// Copy files from the original project directory to the worktree
1110    async fn copy_project_files(
1111        &self,
1112        source_dir: &Path,
1113        target_dir: &Path,
1114        copy_files: &str,
1115    ) -> Result<(), ContainerError> {
1116        let files: Vec<&str> = copy_files
1117            .split(',')
1118            .map(|s| s.trim())
1119            .filter(|s| !s.is_empty())
1120            .collect();
1121
1122        for file_path in files {
1123            let source_file = source_dir.join(file_path);
1124            let target_file = target_dir.join(file_path);
1125
1126            // Create parent directories if needed
1127            if let Some(parent) = target_file.parent()
1128                && !parent.exists()
1129            {
1130                std::fs::create_dir_all(parent).map_err(|e| {
1131                    ContainerError::Other(anyhow!("Failed to create directory {parent:?}: {e}"))
1132                })?;
1133            }
1134
1135            // Copy the file
1136            if source_file.exists() {
1137                std::fs::copy(&source_file, &target_file).map_err(|e| {
1138                    ContainerError::Other(anyhow!(
1139                        "Failed to copy file {source_file:?} to {target_file:?}: {e}"
1140                    ))
1141                })?;
1142                tracing::info!("Copied file {:?} to worktree", file_path);
1143            } else {
1144                return Err(ContainerError::Other(anyhow!(
1145                    "File {source_file:?} does not exist in the project directory"
1146                )));
1147            }
1148        }
1149        Ok(())
1150    }
1151
1152    // =========================================================================
1153    // ExecutionRun methods - lightweight executor invocation without Task
1154    // =========================================================================
1155
1156    /// Create a worktree for an execution run
1157    async fn create_for_run(
1158        &self,
1159        execution_run: &ExecutionRun,
1160    ) -> Result<ContainerRef, ContainerError> {
1161        let project = Project::find_by_id(&self.db.pool, execution_run.project_id)
1162            .await?
1163            .ok_or(sqlx::Error::RowNotFound)?;
1164
1165        let worktree_dir_name = Self::dir_name_from_execution_run(&execution_run.id);
1166        let worktree_path = WorktreeManager::get_worktree_base_dir().join(&worktree_dir_name);
1167
1168        // Create worktree for isolated work
1169        WorktreeManager::create_worktree(
1170            &project.git_repo_path,
1171            &execution_run.branch,
1172            &worktree_path,
1173            &execution_run.target_branch,
1174            true, // create new branch
1175        )
1176        .await?;
1177
1178        // Copy files specified in the project's copy_files field
1179        if let Some(copy_files) = &project.copy_files
1180            && !copy_files.trim().is_empty()
1181        {
1182            self.copy_project_files(&project.git_repo_path, &worktree_path, copy_files)
1183                .await
1184                .unwrap_or_else(|e| {
1185                    tracing::warn!("Failed to copy project files: {}", e);
1186                });
1187        }
1188
1189        // Update container_ref in the database
1190        ExecutionRun::update_container_ref(
1191            &self.db.pool,
1192            execution_run.id,
1193            &worktree_path.to_string_lossy(),
1194        )
1195        .await?;
1196
1197        Ok(worktree_path.to_string_lossy().to_string())
1198    }
1199
1200    /// Inner start execution for runs
1201    async fn start_execution_inner_for_run(
1202        &self,
1203        execution_run: &ExecutionRun,
1204        execution_process: &ExecutionProcess,
1205        executor_action: &ExecutorAction,
1206    ) -> Result<(), ContainerError> {
1207        // Get the worktree path
1208        let container_ref = execution_run
1209            .container_ref
1210            .as_ref()
1211            .ok_or(ContainerError::Other(anyhow!(
1212                "Container ref not found for execution run"
1213            )))?;
1214        let current_dir = PathBuf::from(container_ref);
1215
1216        let approvals_service: Arc<dyn ExecutorApprovalService> =
1217            match executor_action.base_executor() {
1218                Some(BaseCodingAgent::Codex) | Some(BaseCodingAgent::ClaudeCode) => {
1219                    ExecutorApprovalBridge::new(
1220                        self.approvals.clone(),
1221                        self.db.clone(),
1222                        execution_process.id,
1223                    )
1224                }
1225                _ => Arc::new(NoopExecutorApprovalService {}),
1226            };
1227
1228        // Create the child and stream, add to execution tracker
1229        let mut spawned = executor_action
1230            .spawn(&current_dir, approvals_service)
1231            .await?;
1232
1233        self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
1234            .await;
1235
1236        self.add_child_to_store(execution_process.id, spawned.child)
1237            .await;
1238
1239        // Spawn exit monitor for execution run - simpler version without task status updates
1240        let _hn = self.spawn_exit_monitor_for_run(
1241            &execution_process.id,
1242            execution_run.id,
1243            spawned.exit_signal,
1244        );
1245
1246        Ok(())
1247    }
1248}
1249
1250impl LocalContainerService {
1251    /// Generate directory name for execution run worktree
1252    fn dir_name_from_execution_run(run_id: &Uuid) -> String {
1253        format!("run-{}", short_uuid(run_id))
1254    }
1255
1256    /// Spawn exit monitor specifically for execution runs (no task status updates)
1257    fn spawn_exit_monitor_for_run(
1258        &self,
1259        exec_id: &Uuid,
1260        run_id: Uuid,
1261        exit_signal: Option<tokio::sync::oneshot::Receiver<()>>,
1262    ) -> JoinHandle<()> {
1263        let exec_id = *exec_id;
1264        let child_store = self.child_store.clone();
1265        let msg_stores = self.msg_stores.clone();
1266        let db = self.db.clone();
1267        let container = self.clone();
1268
1269        let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
1270
1271        tokio::spawn(async move {
1272            let mut exit_signal_future = exit_signal
1273                .map(|rx| rx.map(|_| ()).boxed())
1274                .unwrap_or_else(|| std::future::pending::<()>().boxed());
1275
1276            let status_result: std::io::Result<std::process::ExitStatus>;
1277
1278            tokio::select! {
1279                _ = &mut exit_signal_future => {
1280                    if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
1281                        let mut child = child_lock.write().await;
1282                        if let Err(err) = command::kill_process_group(&mut child).await {
1283                            tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
1284                        }
1285                    }
1286                    status_result = Ok(success_exit_status());
1287                }
1288                exit_status_result = &mut process_exit_rx => {
1289                    status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
1290                }
1291            }
1292
1293            let (exit_code, status) = match status_result {
1294                Ok(exit_status) => {
1295                    let code = exit_status.code().unwrap_or(-1) as i64;
1296                    let status = if exit_status.success() {
1297                        ExecutionProcessStatus::Completed
1298                    } else {
1299                        ExecutionProcessStatus::Failed
1300                    };
1301                    (Some(code), status)
1302                }
1303                Err(_) => (None, ExecutionProcessStatus::Failed),
1304            };
1305
1306            if !ExecutionProcess::was_stopped(&db.pool, exec_id).await
1307                && let Err(e) = ExecutionProcess::update_completion(
1308                    &db.pool,
1309                    exec_id,
1310                    status.clone(),
1311                    exit_code,
1312                )
1313                .await
1314            {
1315                tracing::error!("Failed to update execution process completion: {}", e);
1316            }
1317
1318            // Update executor session summary if available
1319            if let Err(e) = container.update_executor_session_summary(&exec_id).await {
1320                tracing::warn!("Failed to update executor session summary: {}", e);
1321            }
1322
1323            // Record after-head commit OID (best-effort)
1324            if let Ok(Some(run)) = ExecutionRun::find_by_id(&db.pool, run_id).await
1325                && let Some(container_ref) = &run.container_ref
1326            {
1327                let worktree = PathBuf::from(container_ref);
1328                if let Ok(head) = container.git().get_head_info(&worktree) {
1329                    let _ =
1330                        ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid)
1331                            .await;
1332                }
1333            }
1334
1335            // Cleanup msg store
1336            if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) {
1337                msg_arc.push_finished();
1338                tokio::time::sleep(Duration::from_millis(50)).await;
1339                match Arc::try_unwrap(msg_arc) {
1340                    Ok(inner) => drop(inner),
1341                    Err(arc) => tracing::error!(
1342                        "There are still {} strong Arcs to MsgStore for {}",
1343                        Arc::strong_count(&arc),
1344                        exec_id
1345                    ),
1346                }
1347            }
1348
1349            // Cleanup child handle
1350            child_store.write().await.remove(&exec_id);
1351        })
1352    }
1353
1354    /// Extract the last assistant message from the MsgStore history
1355    fn extract_last_assistant_message(&self, exec_id: &Uuid) -> Option<String> {
1356        // Get the MsgStore for this execution
1357        let msg_stores = self.msg_stores.try_read().ok()?;
1358        let msg_store = msg_stores.get(exec_id)?;
1359
1360        // Get the history and scan in reverse for the last assistant message
1361        let history = msg_store.get_history();
1362
1363        for msg in history.iter().rev() {
1364            if let LogMsg::JsonPatch(patch) = msg {
1365                // Try to extract a NormalizedEntry from the patch
1366                if let Some((_, entry)) = extract_normalized_entry_from_patch(patch)
1367                    && matches!(entry.entry_type, NormalizedEntryType::AssistantMessage)
1368                {
1369                    let content = entry.content.trim();
1370                    if !content.is_empty() {
1371                        const MAX_SUMMARY_LENGTH: usize = 4096;
1372                        if content.len() > MAX_SUMMARY_LENGTH {
1373                            let truncated = truncate_to_char_boundary(content, MAX_SUMMARY_LENGTH);
1374                            return Some(format!("{truncated}..."));
1375                        }
1376                        return Some(content.to_string());
1377                    }
1378                }
1379            }
1380        }
1381
1382        None
1383    }
1384
1385    /// Update the executor session summary with the final assistant message
1386    async fn update_executor_session_summary(&self, exec_id: &Uuid) -> Result<(), anyhow::Error> {
1387        // Check if there's an executor session for this execution process
1388        let session =
1389            ExecutorSession::find_by_execution_process_id(&self.db.pool, *exec_id).await?;
1390
1391        if let Some(session) = session {
1392            // Only update if summary is not already set
1393            if session.summary.is_none() {
1394                if let Some(summary) = self.extract_last_assistant_message(exec_id) {
1395                    ExecutorSession::update_summary(&self.db.pool, *exec_id, &summary).await?;
1396                } else {
1397                    tracing::debug!("No assistant message found for execution {}", exec_id);
1398                }
1399            }
1400        }
1401
1402        Ok(())
1403    }
1404
1405    /// If a queued follow-up draft exists for this attempt and nothing is running,
1406    /// start it immediately and clear the draft.
1407    async fn try_consume_queued_followup(
1408        &self,
1409        ctx: &ExecutionContext,
1410    ) -> Result<(), ContainerError> {
1411        // Only consider CodingAgent/cleanup chains; skip DevServer completions
1412        if matches!(
1413            ctx.execution_process.run_reason,
1414            ExecutionProcessRunReason::DevServer
1415        ) {
1416            return Ok(());
1417        }
1418
1419        // If anything is running for this attempt, bail
1420        let procs =
1421            ExecutionProcess::find_by_task_attempt_id(&self.db.pool, ctx.task_attempt.id, false)
1422                .await?;
1423        if procs
1424            .iter()
1425            .any(|p| matches!(p.status, ExecutionProcessStatus::Running))
1426        {
1427            return Ok(());
1428        }
1429
1430        // Load draft and ensure it's eligible
1431        let Some(draft) = Draft::find_by_task_attempt_and_type(
1432            &self.db.pool,
1433            ctx.task_attempt.id,
1434            DraftType::FollowUp,
1435        )
1436        .await?
1437        else {
1438            return Ok(());
1439        };
1440
1441        if !draft.queued || draft.prompt.trim().is_empty() {
1442            return Ok(());
1443        }
1444
1445        // Atomically acquire sending lock; if not acquired, someone else is sending.
1446        if !Draft::try_mark_sending(&self.db.pool, ctx.task_attempt.id, DraftType::FollowUp)
1447            .await
1448            .unwrap_or(false)
1449        {
1450            return Ok(());
1451        }
1452
1453        // Ensure worktree exists
1454        let container_ref = self.ensure_container_exists(&ctx.task_attempt).await?;
1455
1456        // Get session id
1457        let Some(session_id) = ExecutionProcess::find_latest_session_id_by_task_attempt(
1458            &self.db.pool,
1459            ctx.task_attempt.id,
1460        )
1461        .await?
1462        else {
1463            tracing::warn!(
1464                "No session id found for attempt {}. Cannot start queued follow-up.",
1465                ctx.task_attempt.id
1466            );
1467            return Ok(());
1468        };
1469
1470        // Get last coding agent process to inherit executor profile
1471        let Some(latest) = ExecutionProcess::find_latest_by_task_attempt_and_run_reason(
1472            &self.db.pool,
1473            ctx.task_attempt.id,
1474            &ExecutionProcessRunReason::CodingAgent,
1475        )
1476        .await?
1477        else {
1478            tracing::warn!(
1479                "No prior CodingAgent process for attempt {}. Cannot start queued follow-up.",
1480                ctx.task_attempt.id
1481            );
1482            return Ok(());
1483        };
1484
1485        use forge_core_executors::actions::ExecutorActionType;
1486        let initial_executor_profile_id = match &latest.executor_action()?.typ {
1487            ExecutorActionType::CodingAgentInitialRequest(req) => req.executor_profile_id.clone(),
1488            ExecutorActionType::CodingAgentFollowUpRequest(req) => req.executor_profile_id.clone(),
1489            _ => {
1490                tracing::warn!(
1491                    "Latest process for attempt {} is not a coding agent; skipping queued follow-up",
1492                    ctx.task_attempt.id
1493                );
1494                return Ok(());
1495            }
1496        };
1497
1498        let executor_profile_id = forge_core_executors::profile::ExecutorProfileId {
1499            executor: initial_executor_profile_id.executor,
1500            variant: draft.variant.clone(),
1501        };
1502
1503        // Prepare cleanup action
1504        let cleanup_action = ctx
1505            .task
1506            .parent_project(&self.db.pool)
1507            .await?
1508            .and_then(|project| self.cleanup_action(project.cleanup_script));
1509
1510        // Handle images: associate, copy to worktree, canonicalize prompt
1511        let mut prompt = draft.prompt.clone();
1512        if let Some(image_ids) = &draft.image_ids {
1513            // Associate to task
1514            let _ = TaskImage::associate_many_dedup(&self.db.pool, ctx.task.id, image_ids).await;
1515
1516            // Copy to worktree and canonicalize
1517            let worktree_path = std::path::PathBuf::from(&container_ref);
1518            if let Err(e) = self
1519                .image_service
1520                .copy_images_by_ids_to_worktree(&worktree_path, image_ids)
1521                .await
1522            {
1523                tracing::warn!("Failed to copy images to worktree: {}", e);
1524            } else {
1525                prompt = ImageService::canonicalise_image_paths(&prompt, &worktree_path);
1526            }
1527        }
1528
1529        let follow_up_request =
1530            forge_core_executors::actions::coding_agent_follow_up::CodingAgentFollowUpRequest {
1531                prompt,
1532                session_id,
1533                executor_profile_id,
1534            };
1535
1536        let follow_up_action = forge_core_executors::actions::ExecutorAction::new(
1537            forge_core_executors::actions::ExecutorActionType::CodingAgentFollowUpRequest(
1538                follow_up_request,
1539            ),
1540            cleanup_action,
1541        );
1542
1543        // Start the execution
1544        let _ = self
1545            .start_execution(
1546                &ctx.task_attempt,
1547                &follow_up_action,
1548                &ExecutionProcessRunReason::CodingAgent,
1549            )
1550            .await?;
1551
1552        // Clear the draft to reflect that it has been consumed
1553        let _ =
1554            Draft::clear_after_send(&self.db.pool, ctx.task_attempt.id, DraftType::FollowUp).await;
1555
1556        Ok(())
1557    }
1558}
1559
1560fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str {
1561    if content.len() <= max_len {
1562        return content;
1563    }
1564
1565    let cutoff = content
1566        .char_indices()
1567        .map(|(idx, _)| idx)
1568        .chain(std::iter::once(content.len()))
1569        .take_while(|&idx| idx <= max_len)
1570        .last()
1571        .unwrap_or(0);
1572
1573    debug_assert!(content.is_char_boundary(cutoff));
1574    &content[..cutoff]
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579
1580    #[test]
1581    fn test_truncate_to_char_boundary() {
1582        use super::truncate_to_char_boundary;
1583
1584        let input = "a".repeat(10);
1585        assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7));
1586
1587        let input = "hello world";
1588        assert_eq!(truncate_to_char_boundary(input, input.len()), input);
1589
1590        let input = "🔥🔥🔥"; // each fire emoji is 4 bytes
1591        assert_eq!(truncate_to_char_boundary(input, 5), "🔥");
1592        assert_eq!(truncate_to_char_boundary(input, 3), "");
1593    }
1594}