1use std::{
2 collections::HashMap,
3 io,
4 path::{Path, PathBuf},
5 sync::{Arc, atomic::AtomicUsize},
6 time::Duration,
7};
8
9use anyhow::anyhow;
10use async_trait::async_trait;
11use command_group::AsyncGroupChild;
12use forge_core_db::{
13 DBService,
14 models::{
15 draft::{Draft, DraftType},
16 execution_process::{
17 ExecutionContext, ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus,
18 },
19 execution_run::ExecutionRun,
20 executor_session::ExecutorSession,
21 image::TaskImage,
22 merge::Merge,
23 project::Project,
24 task::{Task, TaskStatus},
25 task_attempt::TaskAttempt,
26 },
27};
28use forge_core_deployment::DeploymentError;
29use forge_core_executors::{
30 actions::{Executable, ExecutorAction},
31 approvals::{ExecutorApprovalService, NoopExecutorApprovalService},
32 executors::BaseCodingAgent,
33 logs::{
34 NormalizedEntryType,
35 utils::{
36 ConversationPatch,
37 patch::{escape_json_pointer_segment, extract_normalized_entry_from_patch},
38 },
39 },
40};
41use forge_core_services::services::{
42 analytics::AnalyticsContext,
43 approvals::{Approvals, executor_approvals::ExecutorApprovalBridge},
44 config::Config,
45 container::{ContainerError, ContainerRef, ContainerService},
46 diff_stream::{self, DiffStreamHandle},
47 git::{Commit, DiffTarget, GitService},
48 image::ImageService,
49 notification::NotificationService,
50 worktree_manager::WorktreeManager,
51};
52use forge_core_utils::{
53 log_msg::LogMsg,
54 msg_store::MsgStore,
55 text::{git_branch_id, short_uuid},
56};
57use futures::{FutureExt, StreamExt, TryStreamExt, stream::select};
58use serde_json::json;
59use tokio::{sync::RwLock, task::JoinHandle};
60use tokio_util::io::ReaderStream;
61use uuid::Uuid;
62
63use crate::command;
64
65#[derive(Clone)]
66pub struct LocalContainerService {
67 db: DBService,
68 child_store: Arc<RwLock<HashMap<Uuid, Arc<RwLock<AsyncGroupChild>>>>>,
69 msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
70 config: Arc<RwLock<Config>>,
71 git: GitService,
72 image_service: ImageService,
73 analytics: Option<AnalyticsContext>,
74 approvals: Approvals,
75}
76
77impl LocalContainerService {
78 pub fn new(
79 db: DBService,
80 msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
81 config: Arc<RwLock<Config>>,
82 git: GitService,
83 image_service: ImageService,
84 analytics: Option<AnalyticsContext>,
85 approvals: Approvals,
86 ) -> Self {
87 let child_store = Arc::new(RwLock::new(HashMap::new()));
88
89 LocalContainerService {
90 db,
91 child_store,
92 msg_stores,
93 config,
94 git,
95 image_service,
96 analytics,
97 approvals,
98 }
99 }
100
101 pub async fn get_child_from_store(&self, id: &Uuid) -> Option<Arc<RwLock<AsyncGroupChild>>> {
102 let map = self.child_store.read().await;
103 map.get(id).cloned()
104 }
105
106 pub async fn add_child_to_store(&self, id: Uuid, exec: AsyncGroupChild) {
107 let mut map = self.child_store.write().await;
108 map.insert(id, Arc::new(RwLock::new(exec)));
109 }
110
111 pub async fn remove_child_from_store(&self, id: &Uuid) {
112 let mut map = self.child_store.write().await;
113 map.remove(id);
114 }
115
116 fn should_finalize(ctx: &ExecutionContext) -> bool {
120 ctx.execution_process
121 .executor_action()
122 .unwrap()
123 .next_action
124 .is_none()
125 && (!matches!(
126 ctx.execution_process.run_reason,
127 ExecutionProcessRunReason::DevServer
128 ))
129 }
130
131 async fn finalize_task(db: &DBService, config: &Arc<RwLock<Config>>, ctx: &ExecutionContext) {
133 if let Err(e) = Task::update_status(&db.pool, ctx.task.id, TaskStatus::InReview).await {
134 tracing::error!("Failed to update task status to InReview: {e}");
135 }
136 let notify_cfg = config.read().await.notifications.clone();
137 NotificationService::notify_execution_halted(notify_cfg, ctx).await;
138 }
139
140 async fn check_externally_deleted_worktrees(db: &DBService) -> Result<(), DeploymentError> {
142 let active_attempts = TaskAttempt::find_by_worktree_deleted(&db.pool).await?;
143 tracing::debug!(
144 "Checking {} active worktrees for external deletion...",
145 active_attempts.len()
146 );
147 for (attempt_id, worktree_path) in active_attempts {
148 if !std::path::Path::new(&worktree_path).exists() {
150 if let Err(e) = TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await {
152 tracing::error!(
153 "Failed to mark externally deleted worktree as deleted for attempt {}: {}",
154 attempt_id,
155 e
156 );
157 } else {
158 tracing::info!(
159 "Marked externally deleted worktree as deleted for attempt {} (path: {})",
160 attempt_id,
161 worktree_path
162 );
163 }
164 }
165 }
166 Ok(())
167 }
168
169 async fn cleanup_orphaned_worktrees(&self) {
171 if std::env::var("DISABLE_WORKTREE_ORPHAN_CLEANUP").is_ok() {
173 tracing::debug!(
174 "Orphan worktree cleanup is disabled via DISABLE_WORKTREE_ORPHAN_CLEANUP environment variable"
175 );
176 return;
177 }
178 let worktree_base_dir = WorktreeManager::get_worktree_base_dir();
179 if !worktree_base_dir.exists() {
180 tracing::debug!(
181 "Worktree base directory {} does not exist, skipping orphan cleanup",
182 worktree_base_dir.display()
183 );
184 return;
185 }
186 let entries = match std::fs::read_dir(&worktree_base_dir) {
187 Ok(entries) => entries,
188 Err(e) => {
189 tracing::error!(
190 "Failed to read worktree base directory {}: {}",
191 worktree_base_dir.display(),
192 e
193 );
194 return;
195 }
196 };
197 for entry in entries {
198 let entry = match entry {
199 Ok(entry) => entry,
200 Err(e) => {
201 tracing::warn!("Failed to read directory entry: {}", e);
202 continue;
203 }
204 };
205 let path = entry.path();
206 if !path.is_dir() {
208 continue;
209 }
210
211 let worktree_path_str = path.to_string_lossy().to_string();
212 let task_exists =
214 TaskAttempt::container_ref_exists(&self.db().pool, &worktree_path_str)
215 .await
216 .unwrap_or(true);
217 let run_exists =
218 ExecutionRun::container_ref_exists(&self.db().pool, &worktree_path_str)
219 .await
220 .unwrap_or(true);
221
222 if !task_exists && !run_exists {
223 tracing::info!("Found orphaned worktree: {}", worktree_path_str);
225 if let Err(e) = WorktreeManager::cleanup_worktree(&path, None).await {
226 tracing::error!(
227 "Failed to remove orphaned worktree {}: {}",
228 worktree_path_str,
229 e
230 );
231 } else {
232 tracing::info!(
233 "Successfully removed orphaned worktree: {}",
234 worktree_path_str
235 );
236 }
237 }
238 }
239 }
240
241 pub async fn cleanup_expired_attempt(
242 db: &DBService,
243 attempt_id: Uuid,
244 worktree_path: PathBuf,
245 git_repo_path: PathBuf,
246 ) -> Result<(), DeploymentError> {
247 WorktreeManager::cleanup_worktree(&worktree_path, Some(&git_repo_path)).await?;
248 TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await?;
250 tracing::info!("Successfully marked worktree as deleted for attempt {attempt_id}",);
251 Ok(())
252 }
253
254 pub async fn cleanup_expired_attempts(db: &DBService) -> Result<(), DeploymentError> {
255 let expired_attempts = TaskAttempt::find_expired_for_cleanup(&db.pool).await?;
256 if expired_attempts.is_empty() {
257 tracing::debug!("No expired worktrees found");
258 return Ok(());
259 }
260 tracing::info!(
261 "Found {} expired worktrees to clean up",
262 expired_attempts.len()
263 );
264 for (attempt_id, worktree_path, git_repo_path) in expired_attempts {
265 Self::cleanup_expired_attempt(
266 db,
267 attempt_id,
268 PathBuf::from(worktree_path),
269 PathBuf::from(git_repo_path),
270 )
271 .await
272 .unwrap_or_else(|e| {
273 tracing::error!("Failed to clean up expired attempt {attempt_id}: {e}",);
274 });
275 }
276 Ok(())
277 }
278
279 pub async fn spawn_worktree_cleanup(&self) {
280 let db = self.db.clone();
281 let mut cleanup_interval = tokio::time::interval(tokio::time::Duration::from_secs(1800)); self.cleanup_orphaned_worktrees().await;
283 tokio::spawn(async move {
284 loop {
285 cleanup_interval.tick().await;
286 tracing::info!("Starting periodic worktree cleanup...");
287 Self::check_externally_deleted_worktrees(&db)
288 .await
289 .unwrap_or_else(|e| {
290 tracing::error!("Failed to check externally deleted worktrees: {}", e);
291 });
292 Self::cleanup_expired_attempts(&db)
293 .await
294 .unwrap_or_else(|e| {
295 tracing::error!("Failed to clean up expired worktree attempts: {}", e)
296 });
297 }
298 });
299 }
300
301 pub fn spawn_exit_monitor(
304 &self,
305 exec_id: &Uuid,
306 exit_signal: Option<tokio::sync::oneshot::Receiver<()>>,
307 ) -> JoinHandle<()> {
308 let exec_id = *exec_id;
309 let child_store = self.child_store.clone();
310 let msg_stores = self.msg_stores.clone();
311 let db = self.db.clone();
312 let config = self.config.clone();
313 let container = self.clone();
314 let analytics = self.analytics.clone();
315
316 let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
317
318 tokio::spawn(async move {
319 let mut exit_signal_future = exit_signal
320 .map(|rx| rx.map(|_| ()).boxed()) .unwrap_or_else(|| std::future::pending::<()>().boxed()); let status_result: std::io::Result<std::process::ExitStatus>;
324
325 tokio::select! {
327 _ = &mut exit_signal_future => {
331 if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
333 let mut child = child_lock.write().await ;
334 if let Err(err) = command::kill_process_group(&mut child).await {
335 tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
336 }
337 }
338 status_result = Ok(success_exit_status());
339 }
340 exit_status_result = &mut process_exit_rx => {
342 status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
343 }
344 }
345
346 let (exit_code, status) = match status_result {
347 Ok(exit_status) => {
348 let code = exit_status.code().unwrap_or(-1) as i64;
349 let status = if exit_status.success() {
350 ExecutionProcessStatus::Completed
351 } else {
352 ExecutionProcessStatus::Failed
353 };
354 (Some(code), status)
355 }
356 Err(_) => (None, ExecutionProcessStatus::Failed),
357 };
358
359 if !ExecutionProcess::was_stopped(&db.pool, exec_id).await
360 && let Err(e) = ExecutionProcess::update_completion(
361 &db.pool,
362 exec_id,
363 status.clone(),
364 exit_code,
365 )
366 .await
367 {
368 tracing::error!("Failed to update execution process completion: {}", e);
369 }
370
371 if matches!(status, ExecutionProcessStatus::Failed)
373 && let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await
374 && config.read().await.analytics_enabled == Some(true)
375 && matches!(
376 &ctx.execution_process.run_reason,
377 ExecutionProcessRunReason::CodingAgent
378 )
379 && let Some(analytics) = &analytics
380 {
381 analytics.analytics_service.track_event(
382 &analytics.user_id,
383 "task_attempt_failed",
384 Some(json!({
385 "task_id": ctx.task.id.to_string(),
386 "project_id": ctx.task.project_id.to_string(),
387 "attempt_id": ctx.task_attempt.id.to_string(),
388 "exit_code": exit_code,
389 "executor": ctx.execution_process.executor_action().ok()
390 .and_then(|action| match &action.typ {
391 forge_core_executors::actions::ExecutorActionType::CodingAgentInitialRequest(req) =>
392 Some(req.executor_profile_id.executor),
393 forge_core_executors::actions::ExecutorActionType::CodingAgentFollowUpRequest(req) =>
394 Some(req.executor_profile_id.executor),
395 _ => None,
396 })
397 .map(|e| e.to_string())
398 .unwrap_or_else(|| "unknown".to_string()),
399 })),
400 );
401 }
402
403 if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
404 if let Err(e) = container.update_executor_session_summary(&exec_id).await {
406 tracing::warn!("Failed to update executor session summary: {}", e);
407 }
408
409 let success = matches!(
410 ctx.execution_process.status,
411 ExecutionProcessStatus::Completed
412 ) && exit_code == Some(0);
413
414 let cleanup_done = matches!(
415 ctx.execution_process.run_reason,
416 ExecutionProcessRunReason::CleanupScript
417 ) && !matches!(
418 ctx.execution_process.status,
419 ExecutionProcessStatus::Running
420 );
421
422 if success || cleanup_done {
423 let changes_committed = match container.try_commit_changes(&ctx).await {
425 Ok(committed) => committed,
426 Err(e) => {
427 tracing::error!("Failed to commit changes after execution: {}", e);
428 true
430 }
431 };
432
433 let should_start_next = if matches!(
434 ctx.execution_process.run_reason,
435 ExecutionProcessRunReason::CodingAgent
436 ) {
437 changes_committed
438 } else {
439 true
440 };
441
442 if should_start_next {
443 if let Err(e) = container.try_start_next_action(&ctx).await {
445 tracing::error!("Failed to start next action after completion: {}", e);
446 }
447 } else {
448 tracing::info!(
449 "Skipping cleanup script for task attempt {} - no changes made by coding agent",
450 ctx.task_attempt.id
451 );
452
453 Self::finalize_task(&db, &config, &ctx).await;
455 }
456 }
457
458 if Self::should_finalize(&ctx) {
459 Self::finalize_task(&db, &config, &ctx).await;
460 if let Err(e) = container.try_consume_queued_followup(&ctx).await {
462 tracing::error!(
463 "Failed to start queued follow-up for attempt {}: {}",
464 ctx.task_attempt.id,
465 e
466 );
467 }
468 }
469
470 if config.read().await.analytics_enabled == Some(true)
472 && matches!(
473 &ctx.execution_process.run_reason,
474 ExecutionProcessRunReason::CodingAgent
475 )
476 && let Some(analytics) = &analytics
477 {
478 analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({
479 "task_id": ctx.task.id.to_string(),
480 "project_id": ctx.task.project_id.to_string(),
481 "attempt_id": ctx.task_attempt.id.to_string(),
482 "execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed),
483 "exit_code": ctx.execution_process.exit_code,
484 })));
485 }
486 }
487
488 if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
491 let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt);
492 if let Ok(head) = container.git().get_head_info(&worktree_dir)
493 && let Err(e) =
494 ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid)
495 .await
496 {
497 tracing::warn!("Failed to update after_head_commit for {}: {}", exec_id, e);
498 }
499 }
500
501 if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) {
503 msg_arc.push_finished();
504 tokio::time::sleep(Duration::from_millis(50)).await; match Arc::try_unwrap(msg_arc) {
506 Ok(inner) => drop(inner),
507 Err(arc) => tracing::error!(
508 "There are still {} strong Arcs to MsgStore for {}",
509 Arc::strong_count(&arc),
510 exec_id
511 ),
512 }
513 }
514
515 child_store.write().await.remove(&exec_id);
517 })
518 }
519
520 pub fn spawn_os_exit_watcher(
521 &self,
522 exec_id: Uuid,
523 ) -> tokio::sync::oneshot::Receiver<std::io::Result<std::process::ExitStatus>> {
524 let (tx, rx) = tokio::sync::oneshot::channel::<std::io::Result<std::process::ExitStatus>>();
525 let child_store = self.child_store.clone();
526 tokio::spawn(async move {
527 loop {
528 let child_lock = {
529 let map = child_store.read().await;
530 map.get(&exec_id).cloned()
531 };
532 if let Some(child_lock) = child_lock {
533 let mut child_handler = child_lock.write().await;
534 match child_handler.try_wait() {
535 Ok(Some(status)) => {
536 let _ = tx.send(Ok(status));
537 break;
538 }
539 Ok(None) => {}
540 Err(e) => {
541 let _ = tx.send(Err(e));
542 break;
543 }
544 }
545 } else {
546 let _ = tx.send(Err(io::Error::other(format!(
547 "Child handle missing for {exec_id}"
548 ))));
549 break;
550 }
551 tokio::time::sleep(Duration::from_millis(250)).await;
552 }
553 });
554 rx
555 }
556
557 pub fn dir_name_from_task_attempt(attempt_id: &Uuid, task_title: &str) -> String {
558 let task_title_id = git_branch_id(task_title);
559 format!("{}-{}", short_uuid(attempt_id), task_title_id)
560 }
561
562 async fn track_child_msgs_in_store(&self, id: Uuid, child: &mut AsyncGroupChild) {
563 let store = Arc::new(MsgStore::new());
564
565 let out = child.inner().stdout.take().expect("no stdout");
566 let err = child.inner().stderr.take().expect("no stderr");
567
568 let out = ReaderStream::new(out)
570 .map_ok(|chunk| LogMsg::Stdout(String::from_utf8_lossy(&chunk).into_owned()));
571
572 let err = ReaderStream::new(err)
574 .map_ok(|chunk| LogMsg::Stderr(String::from_utf8_lossy(&chunk).into_owned()));
575
576 let merged = select(out, err); let debounced = forge_core_utils::stream_ext::debounce_logs(merged);
581 store.clone().spawn_forwarder(debounced);
582
583 let mut map = self.msg_stores().write().await;
584 map.insert(id, store);
585 }
586
587 #[allow(dead_code)]
589 async fn get_worktree_path(
590 &self,
591 task_attempt: &TaskAttempt,
592 ) -> Result<PathBuf, ContainerError> {
593 let container_ref = self.ensure_container_exists(task_attempt).await?;
594 let worktree_dir = PathBuf::from(&container_ref);
595
596 if !worktree_dir.exists() {
597 return Err(ContainerError::Other(anyhow!(
598 "Worktree directory not found"
599 )));
600 }
601
602 Ok(worktree_dir)
603 }
604 async fn get_project_repo_path(
606 &self,
607 task_attempt: &TaskAttempt,
608 ) -> Result<PathBuf, ContainerError> {
609 let project_repo_path = task_attempt
610 .parent_task(&self.db().pool)
611 .await?
612 .ok_or(ContainerError::Other(anyhow!("Parent task not found")))?
613 .parent_project(&self.db().pool)
614 .await?
615 .ok_or(ContainerError::Other(anyhow!("Parent project not found")))?
616 .git_repo_path;
617
618 Ok(project_repo_path)
619 }
620
621 fn create_merged_diff_stream(
623 &self,
624 project_repo_path: &Path,
625 merge_commit_id: &str,
626 stats_only: bool,
627 ) -> Result<DiffStreamHandle, ContainerError> {
628 let diffs = self.git().get_diffs(
629 DiffTarget::Commit {
630 repo_path: project_repo_path,
631 commit_sha: merge_commit_id,
632 },
633 None,
634 )?;
635
636 let cum = Arc::new(AtomicUsize::new(0));
637 let diffs: Vec<_> = diffs
638 .into_iter()
639 .map(|mut d| {
640 diff_stream::apply_stream_omit_policy(&mut d, &cum, stats_only);
641 d
642 })
643 .collect();
644
645 let stream = futures::stream::iter(diffs.into_iter().map(|diff| {
646 let entry_index = GitService::diff_path(&diff);
647 let patch =
648 ConversationPatch::add_diff(escape_json_pointer_segment(&entry_index), diff);
649 Ok::<_, std::io::Error>(LogMsg::JsonPatch(patch))
650 }))
651 .chain(futures::stream::once(async {
652 Ok::<_, std::io::Error>(LogMsg::Finished)
653 }))
654 .boxed();
655
656 Ok(diff_stream::DiffStreamHandle::new(stream, None))
657 }
658
659 async fn create_live_diff_stream(
662 &self,
663 worktree_path: &Path,
664 base_commit: &Commit,
665 stats_only: bool,
666 ) -> Result<DiffStreamHandle, ContainerError> {
667 diff_stream::create(
668 self.git().clone(),
669 worktree_path.to_path_buf(),
670 base_commit.clone(),
671 stats_only,
672 )
673 .await
674 .map_err(|e| ContainerError::Other(anyhow!("{e}")))
675 }
676}
677
678fn success_exit_status() -> std::process::ExitStatus {
679 #[cfg(unix)]
680 {
681 use std::os::unix::process::ExitStatusExt;
682 ExitStatusExt::from_raw(0)
683 }
684 #[cfg(windows)]
685 {
686 use std::os::windows::process::ExitStatusExt;
687 ExitStatusExt::from_raw(0)
688 }
689}
690
691#[async_trait]
692impl ContainerService for LocalContainerService {
693 fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>> {
694 &self.msg_stores
695 }
696
697 fn db(&self) -> &DBService {
698 &self.db
699 }
700
701 fn git(&self) -> &GitService {
702 &self.git
703 }
704
705 async fn git_branch_prefix(&self) -> String {
706 self.config.read().await.git_branch_prefix.clone()
707 }
708
709 fn task_attempt_to_current_dir(&self, task_attempt: &TaskAttempt) -> PathBuf {
710 PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default())
711 }
712 async fn create(&self, task_attempt: &TaskAttempt) -> Result<ContainerRef, ContainerError> {
714 let task = task_attempt
715 .parent_task(&self.db.pool)
716 .await?
717 .ok_or(sqlx::Error::RowNotFound)?;
718
719 let worktree_dir_name =
720 LocalContainerService::dir_name_from_task_attempt(&task_attempt.id, &task.title);
721 let worktree_path = WorktreeManager::get_worktree_base_dir().join(&worktree_dir_name);
722
723 let project = task
724 .parent_project(&self.db.pool)
725 .await?
726 .ok_or(sqlx::Error::RowNotFound)?;
727
728 let use_worktree = sqlx::query_scalar::<_, bool>(
731 "SELECT COALESCE((SELECT use_worktree FROM forge_task_attempt_config WHERE task_attempt_id = ?), 1)"
732 )
733 .bind(task_attempt.id)
734 .fetch_one(&self.db.pool)
735 .await
736 .unwrap_or(true); let container_ref_path = if use_worktree {
739 WorktreeManager::create_worktree(
741 &project.git_repo_path,
742 &task_attempt.branch,
743 &worktree_path,
744 &task_attempt.target_branch,
745 true, )
747 .await?;
748
749 if let Some(copy_files) = &project.copy_files
751 && !copy_files.trim().is_empty()
752 {
753 self.copy_project_files(&project.git_repo_path, &worktree_path, copy_files)
754 .await
755 .unwrap_or_else(|e| {
756 tracing::warn!("Failed to copy project files: {}", e);
757 });
758 }
759
760 if let Err(e) = self
762 .image_service
763 .copy_images_by_task_to_worktree(&worktree_path, task.id)
764 .await
765 {
766 tracing::warn!("Failed to copy task images to worktree: {}", e);
767 }
768
769 worktree_path
770 } else {
771 PathBuf::from(&project.git_repo_path)
773 };
774
775 TaskAttempt::update_container_ref(
777 &self.db.pool,
778 task_attempt.id,
779 &container_ref_path.to_string_lossy(),
780 )
781 .await?;
782
783 Ok(container_ref_path.to_string_lossy().to_string())
784 }
785
786 async fn delete_inner(&self, task_attempt: &TaskAttempt) -> Result<(), ContainerError> {
787 let task = task_attempt
789 .parent_task(&self.db.pool)
790 .await?
791 .ok_or(sqlx::Error::RowNotFound)?;
792 let git_repo_path = match Project::find_by_id(&self.db.pool, task.project_id).await {
793 Ok(Some(project)) => Some(project.git_repo_path.clone()),
794 Ok(None) => None,
795 Err(e) => {
796 tracing::error!("Failed to fetch project {}: {}", task.project_id, e);
797 None
798 }
799 };
800 WorktreeManager::cleanup_worktree(
801 &PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()),
802 git_repo_path.as_deref(),
803 )
804 .await
805 .unwrap_or_else(|e| {
806 tracing::warn!(
807 "Failed to clean up worktree for task attempt {}: {}",
808 task_attempt.id,
809 e
810 );
811 });
812 Ok(())
813 }
814
815 async fn ensure_container_exists(
816 &self,
817 task_attempt: &TaskAttempt,
818 ) -> Result<ContainerRef, ContainerError> {
819 let task = task_attempt
821 .parent_task(&self.db.pool)
822 .await?
823 .ok_or(sqlx::Error::RowNotFound)?;
824
825 let project = task
826 .parent_project(&self.db.pool)
827 .await?
828 .ok_or(sqlx::Error::RowNotFound)?;
829
830 let container_ref = task_attempt.container_ref.as_ref().ok_or_else(|| {
831 ContainerError::Other(anyhow!("Container ref not found for task attempt"))
832 })?;
833 let worktree_path = PathBuf::from(container_ref);
834
835 let use_worktree = sqlx::query_scalar::<_, bool>(
838 "SELECT COALESCE((SELECT use_worktree FROM forge_task_attempt_config WHERE task_attempt_id = ?), 1)"
839 )
840 .bind(task_attempt.id)
841 .fetch_one(&self.db.pool)
842 .await
843 .unwrap_or(true); if use_worktree {
847 WorktreeManager::ensure_worktree_exists(
848 &project.git_repo_path,
849 &task_attempt.branch,
850 &worktree_path,
851 )
852 .await?;
853 }
854
855 Ok(container_ref.to_string())
856 }
857
858 async fn is_container_clean(&self, task_attempt: &TaskAttempt) -> Result<bool, ContainerError> {
859 if let Some(container_ref) = &task_attempt.container_ref {
860 let path = PathBuf::from(container_ref);
862 if path.exists() {
863 self.git().is_worktree_clean(&path).map_err(|e| e.into())
864 } else {
865 return Ok(true); }
867 } else {
868 return Ok(true); }
870 }
871
872 async fn start_execution_inner(
873 &self,
874 task_attempt: &TaskAttempt,
875 execution_process: &ExecutionProcess,
876 executor_action: &ExecutorAction,
877 ) -> Result<(), ContainerError> {
878 let container_ref = task_attempt
880 .container_ref
881 .as_ref()
882 .ok_or(ContainerError::Other(anyhow!(
883 "Container ref not found for task attempt"
884 )))?;
885 let current_dir = PathBuf::from(container_ref);
886
887 let approvals_service: Arc<dyn ExecutorApprovalService> =
888 match executor_action.base_executor() {
889 Some(BaseCodingAgent::Codex) | Some(BaseCodingAgent::ClaudeCode) => {
890 ExecutorApprovalBridge::new(
891 self.approvals.clone(),
892 self.db.clone(),
893 execution_process.id,
894 )
895 }
896 _ => Arc::new(NoopExecutorApprovalService {}),
897 };
898
899 let mut spawned = executor_action
901 .spawn(¤t_dir, approvals_service)
902 .await?;
903
904 self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
905 .await;
906
907 self.add_child_to_store(execution_process.id, spawned.child)
908 .await;
909
910 let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal);
912
913 Ok(())
914 }
915
916 async fn stop_execution(
917 &self,
918 execution_process: &ExecutionProcess,
919 status: ExecutionProcessStatus,
920 ) -> Result<(), ContainerError> {
921 let child = self
922 .get_child_from_store(&execution_process.id)
923 .await
924 .ok_or_else(|| {
925 ContainerError::Other(anyhow!("Child process not found for execution"))
926 })?;
927 let exit_code = if status == ExecutionProcessStatus::Completed {
928 Some(0)
929 } else {
930 None
931 };
932
933 ExecutionProcess::update_completion(&self.db.pool, execution_process.id, status, exit_code)
934 .await?;
935
936 {
938 let mut child_guard = child.write().await;
939 if let Err(e) = command::kill_process_group(&mut child_guard).await {
940 tracing::error!(
941 "Failed to stop execution process {}: {}",
942 execution_process.id,
943 e
944 );
945 return Err(e);
946 }
947 }
948 self.remove_child_from_store(&execution_process.id).await;
949
950 if let Some(msg) = self.msg_stores.write().await.remove(&execution_process.id) {
952 msg.push_finished();
953 }
954
955 if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await
957 && !matches!(
958 ctx.execution_process.run_reason,
959 ExecutionProcessRunReason::DevServer
960 )
961 && let Err(e) =
962 Task::update_status(&self.db.pool, ctx.task.id, TaskStatus::InReview).await
963 {
964 tracing::error!("Failed to update task status to InReview: {e}");
965 }
966
967 tracing::debug!(
968 "Execution process {} stopped successfully",
969 execution_process.id
970 );
971
972 if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await {
974 let worktree = self.task_attempt_to_current_dir(&ctx.task_attempt);
975 if let Ok(head) = self.git().get_head_info(&worktree) {
976 let _ = ExecutionProcess::update_after_head_commit(
977 &self.db.pool,
978 execution_process.id,
979 &head.oid,
980 )
981 .await;
982 }
983 }
984
985 Ok(())
986 }
987
988 async fn stream_diff(
989 &self,
990 task_attempt: &TaskAttempt,
991 stats_only: bool,
992 ) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
993 {
994 let project_repo_path = self.get_project_repo_path(task_attempt).await?;
995 let latest_merge =
996 Merge::find_latest_by_task_attempt_id(&self.db.pool, task_attempt.id).await?;
997
998 let is_ahead = if let Ok((ahead, _)) = self.git().get_branch_status(
999 &project_repo_path,
1000 &task_attempt.branch,
1001 &task_attempt.target_branch,
1002 ) {
1003 ahead > 0
1004 } else {
1005 false
1006 };
1007
1008 if let Some(merge) = &latest_merge
1009 && let Some(commit) = merge.merge_commit()
1010 && self.is_container_clean(task_attempt).await?
1011 && !is_ahead
1012 {
1013 match self.create_merged_diff_stream(&project_repo_path, &commit, stats_only) {
1015 Ok(wrapper) => return Ok(Box::pin(wrapper)),
1016 Err(e) => {
1017 tracing::warn!(
1018 "Failed to create merged diff stream for commit {}: {}. Falling back to live diff.",
1019 commit,
1020 e
1021 );
1022 }
1024 }
1025 }
1026
1027 let container_ref = self.ensure_container_exists(task_attempt).await?;
1028 let worktree_path = PathBuf::from(container_ref);
1029 let base_commit = self.git().get_base_commit(
1030 &project_repo_path,
1031 &task_attempt.branch,
1032 &task_attempt.target_branch,
1033 )?;
1034
1035 let wrapper = self
1036 .create_live_diff_stream(&worktree_path, &base_commit, stats_only)
1037 .await?;
1038 Ok(Box::pin(wrapper))
1039 }
1040
1041 async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result<bool, ContainerError> {
1042 if !matches!(
1043 ctx.execution_process.run_reason,
1044 ExecutionProcessRunReason::CodingAgent | ExecutionProcessRunReason::CleanupScript,
1045 ) {
1046 return Ok(false);
1047 }
1048
1049 let message = match ctx.execution_process.run_reason {
1050 ExecutionProcessRunReason::CodingAgent => {
1051 match ExecutorSession::find_by_execution_process_id(
1054 &self.db().pool,
1055 ctx.execution_process.id,
1056 )
1057 .await
1058 {
1059 Ok(Some(session)) if session.summary.is_some() => session.summary.unwrap(),
1060 Ok(_) => {
1061 tracing::debug!(
1062 "No summary found for execution process {}, using default message",
1063 ctx.execution_process.id
1064 );
1065 format!(
1066 "Commit changes from coding agent for task attempt {}",
1067 ctx.task_attempt.id
1068 )
1069 }
1070 Err(e) => {
1071 tracing::debug!(
1072 "Failed to retrieve summary for execution process {}: {}",
1073 ctx.execution_process.id,
1074 e
1075 );
1076 format!(
1077 "Commit changes from coding agent for task attempt {}",
1078 ctx.task_attempt.id
1079 )
1080 }
1081 }
1082 }
1083 ExecutionProcessRunReason::CleanupScript => {
1084 format!(
1085 "Cleanup script changes for task attempt {}",
1086 ctx.task_attempt.id
1087 )
1088 }
1089 _ => Err(ContainerError::Other(anyhow::anyhow!(
1090 "Invalid run reason for commit"
1091 )))?,
1092 };
1093
1094 let container_ref = ctx.task_attempt.container_ref.as_ref().ok_or_else(|| {
1095 ContainerError::Other(anyhow::anyhow!("Container reference not found"))
1096 })?;
1097
1098 tracing::debug!(
1099 "Committing changes for task attempt {} at path {:?}: '{}'",
1100 ctx.task_attempt.id,
1101 &container_ref,
1102 message
1103 );
1104
1105 let changes_committed = self.git().commit(Path::new(container_ref), &message)?;
1106 Ok(changes_committed)
1107 }
1108
1109 async fn copy_project_files(
1111 &self,
1112 source_dir: &Path,
1113 target_dir: &Path,
1114 copy_files: &str,
1115 ) -> Result<(), ContainerError> {
1116 let files: Vec<&str> = copy_files
1117 .split(',')
1118 .map(|s| s.trim())
1119 .filter(|s| !s.is_empty())
1120 .collect();
1121
1122 for file_path in files {
1123 let source_file = source_dir.join(file_path);
1124 let target_file = target_dir.join(file_path);
1125
1126 if let Some(parent) = target_file.parent()
1128 && !parent.exists()
1129 {
1130 std::fs::create_dir_all(parent).map_err(|e| {
1131 ContainerError::Other(anyhow!("Failed to create directory {parent:?}: {e}"))
1132 })?;
1133 }
1134
1135 if source_file.exists() {
1137 std::fs::copy(&source_file, &target_file).map_err(|e| {
1138 ContainerError::Other(anyhow!(
1139 "Failed to copy file {source_file:?} to {target_file:?}: {e}"
1140 ))
1141 })?;
1142 tracing::info!("Copied file {:?} to worktree", file_path);
1143 } else {
1144 return Err(ContainerError::Other(anyhow!(
1145 "File {source_file:?} does not exist in the project directory"
1146 )));
1147 }
1148 }
1149 Ok(())
1150 }
1151
1152 async fn create_for_run(
1158 &self,
1159 execution_run: &ExecutionRun,
1160 ) -> Result<ContainerRef, ContainerError> {
1161 let project = Project::find_by_id(&self.db.pool, execution_run.project_id)
1162 .await?
1163 .ok_or(sqlx::Error::RowNotFound)?;
1164
1165 let worktree_dir_name = Self::dir_name_from_execution_run(&execution_run.id);
1166 let worktree_path = WorktreeManager::get_worktree_base_dir().join(&worktree_dir_name);
1167
1168 WorktreeManager::create_worktree(
1170 &project.git_repo_path,
1171 &execution_run.branch,
1172 &worktree_path,
1173 &execution_run.target_branch,
1174 true, )
1176 .await?;
1177
1178 if let Some(copy_files) = &project.copy_files
1180 && !copy_files.trim().is_empty()
1181 {
1182 self.copy_project_files(&project.git_repo_path, &worktree_path, copy_files)
1183 .await
1184 .unwrap_or_else(|e| {
1185 tracing::warn!("Failed to copy project files: {}", e);
1186 });
1187 }
1188
1189 ExecutionRun::update_container_ref(
1191 &self.db.pool,
1192 execution_run.id,
1193 &worktree_path.to_string_lossy(),
1194 )
1195 .await?;
1196
1197 Ok(worktree_path.to_string_lossy().to_string())
1198 }
1199
1200 async fn start_execution_inner_for_run(
1202 &self,
1203 execution_run: &ExecutionRun,
1204 execution_process: &ExecutionProcess,
1205 executor_action: &ExecutorAction,
1206 ) -> Result<(), ContainerError> {
1207 let container_ref = execution_run
1209 .container_ref
1210 .as_ref()
1211 .ok_or(ContainerError::Other(anyhow!(
1212 "Container ref not found for execution run"
1213 )))?;
1214 let current_dir = PathBuf::from(container_ref);
1215
1216 let approvals_service: Arc<dyn ExecutorApprovalService> =
1217 match executor_action.base_executor() {
1218 Some(BaseCodingAgent::Codex) | Some(BaseCodingAgent::ClaudeCode) => {
1219 ExecutorApprovalBridge::new(
1220 self.approvals.clone(),
1221 self.db.clone(),
1222 execution_process.id,
1223 )
1224 }
1225 _ => Arc::new(NoopExecutorApprovalService {}),
1226 };
1227
1228 let mut spawned = executor_action
1230 .spawn(¤t_dir, approvals_service)
1231 .await?;
1232
1233 self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
1234 .await;
1235
1236 self.add_child_to_store(execution_process.id, spawned.child)
1237 .await;
1238
1239 let _hn = self.spawn_exit_monitor_for_run(
1241 &execution_process.id,
1242 execution_run.id,
1243 spawned.exit_signal,
1244 );
1245
1246 Ok(())
1247 }
1248}
1249
1250impl LocalContainerService {
1251 fn dir_name_from_execution_run(run_id: &Uuid) -> String {
1253 format!("run-{}", short_uuid(run_id))
1254 }
1255
1256 fn spawn_exit_monitor_for_run(
1258 &self,
1259 exec_id: &Uuid,
1260 run_id: Uuid,
1261 exit_signal: Option<tokio::sync::oneshot::Receiver<()>>,
1262 ) -> JoinHandle<()> {
1263 let exec_id = *exec_id;
1264 let child_store = self.child_store.clone();
1265 let msg_stores = self.msg_stores.clone();
1266 let db = self.db.clone();
1267 let container = self.clone();
1268
1269 let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
1270
1271 tokio::spawn(async move {
1272 let mut exit_signal_future = exit_signal
1273 .map(|rx| rx.map(|_| ()).boxed())
1274 .unwrap_or_else(|| std::future::pending::<()>().boxed());
1275
1276 let status_result: std::io::Result<std::process::ExitStatus>;
1277
1278 tokio::select! {
1279 _ = &mut exit_signal_future => {
1280 if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
1281 let mut child = child_lock.write().await;
1282 if let Err(err) = command::kill_process_group(&mut child).await {
1283 tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
1284 }
1285 }
1286 status_result = Ok(success_exit_status());
1287 }
1288 exit_status_result = &mut process_exit_rx => {
1289 status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
1290 }
1291 }
1292
1293 let (exit_code, status) = match status_result {
1294 Ok(exit_status) => {
1295 let code = exit_status.code().unwrap_or(-1) as i64;
1296 let status = if exit_status.success() {
1297 ExecutionProcessStatus::Completed
1298 } else {
1299 ExecutionProcessStatus::Failed
1300 };
1301 (Some(code), status)
1302 }
1303 Err(_) => (None, ExecutionProcessStatus::Failed),
1304 };
1305
1306 if !ExecutionProcess::was_stopped(&db.pool, exec_id).await
1307 && let Err(e) = ExecutionProcess::update_completion(
1308 &db.pool,
1309 exec_id,
1310 status.clone(),
1311 exit_code,
1312 )
1313 .await
1314 {
1315 tracing::error!("Failed to update execution process completion: {}", e);
1316 }
1317
1318 if let Err(e) = container.update_executor_session_summary(&exec_id).await {
1320 tracing::warn!("Failed to update executor session summary: {}", e);
1321 }
1322
1323 if let Ok(Some(run)) = ExecutionRun::find_by_id(&db.pool, run_id).await
1325 && let Some(container_ref) = &run.container_ref
1326 {
1327 let worktree = PathBuf::from(container_ref);
1328 if let Ok(head) = container.git().get_head_info(&worktree) {
1329 let _ =
1330 ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid)
1331 .await;
1332 }
1333 }
1334
1335 if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) {
1337 msg_arc.push_finished();
1338 tokio::time::sleep(Duration::from_millis(50)).await;
1339 match Arc::try_unwrap(msg_arc) {
1340 Ok(inner) => drop(inner),
1341 Err(arc) => tracing::error!(
1342 "There are still {} strong Arcs to MsgStore for {}",
1343 Arc::strong_count(&arc),
1344 exec_id
1345 ),
1346 }
1347 }
1348
1349 child_store.write().await.remove(&exec_id);
1351 })
1352 }
1353
1354 fn extract_last_assistant_message(&self, exec_id: &Uuid) -> Option<String> {
1356 let msg_stores = self.msg_stores.try_read().ok()?;
1358 let msg_store = msg_stores.get(exec_id)?;
1359
1360 let history = msg_store.get_history();
1362
1363 for msg in history.iter().rev() {
1364 if let LogMsg::JsonPatch(patch) = msg {
1365 if let Some((_, entry)) = extract_normalized_entry_from_patch(patch)
1367 && matches!(entry.entry_type, NormalizedEntryType::AssistantMessage)
1368 {
1369 let content = entry.content.trim();
1370 if !content.is_empty() {
1371 const MAX_SUMMARY_LENGTH: usize = 4096;
1372 if content.len() > MAX_SUMMARY_LENGTH {
1373 let truncated = truncate_to_char_boundary(content, MAX_SUMMARY_LENGTH);
1374 return Some(format!("{truncated}..."));
1375 }
1376 return Some(content.to_string());
1377 }
1378 }
1379 }
1380 }
1381
1382 None
1383 }
1384
1385 async fn update_executor_session_summary(&self, exec_id: &Uuid) -> Result<(), anyhow::Error> {
1387 let session =
1389 ExecutorSession::find_by_execution_process_id(&self.db.pool, *exec_id).await?;
1390
1391 if let Some(session) = session {
1392 if session.summary.is_none() {
1394 if let Some(summary) = self.extract_last_assistant_message(exec_id) {
1395 ExecutorSession::update_summary(&self.db.pool, *exec_id, &summary).await?;
1396 } else {
1397 tracing::debug!("No assistant message found for execution {}", exec_id);
1398 }
1399 }
1400 }
1401
1402 Ok(())
1403 }
1404
1405 async fn try_consume_queued_followup(
1408 &self,
1409 ctx: &ExecutionContext,
1410 ) -> Result<(), ContainerError> {
1411 if matches!(
1413 ctx.execution_process.run_reason,
1414 ExecutionProcessRunReason::DevServer
1415 ) {
1416 return Ok(());
1417 }
1418
1419 let procs =
1421 ExecutionProcess::find_by_task_attempt_id(&self.db.pool, ctx.task_attempt.id, false)
1422 .await?;
1423 if procs
1424 .iter()
1425 .any(|p| matches!(p.status, ExecutionProcessStatus::Running))
1426 {
1427 return Ok(());
1428 }
1429
1430 let Some(draft) = Draft::find_by_task_attempt_and_type(
1432 &self.db.pool,
1433 ctx.task_attempt.id,
1434 DraftType::FollowUp,
1435 )
1436 .await?
1437 else {
1438 return Ok(());
1439 };
1440
1441 if !draft.queued || draft.prompt.trim().is_empty() {
1442 return Ok(());
1443 }
1444
1445 if !Draft::try_mark_sending(&self.db.pool, ctx.task_attempt.id, DraftType::FollowUp)
1447 .await
1448 .unwrap_or(false)
1449 {
1450 return Ok(());
1451 }
1452
1453 let container_ref = self.ensure_container_exists(&ctx.task_attempt).await?;
1455
1456 let Some(session_id) = ExecutionProcess::find_latest_session_id_by_task_attempt(
1458 &self.db.pool,
1459 ctx.task_attempt.id,
1460 )
1461 .await?
1462 else {
1463 tracing::warn!(
1464 "No session id found for attempt {}. Cannot start queued follow-up.",
1465 ctx.task_attempt.id
1466 );
1467 return Ok(());
1468 };
1469
1470 let Some(latest) = ExecutionProcess::find_latest_by_task_attempt_and_run_reason(
1472 &self.db.pool,
1473 ctx.task_attempt.id,
1474 &ExecutionProcessRunReason::CodingAgent,
1475 )
1476 .await?
1477 else {
1478 tracing::warn!(
1479 "No prior CodingAgent process for attempt {}. Cannot start queued follow-up.",
1480 ctx.task_attempt.id
1481 );
1482 return Ok(());
1483 };
1484
1485 use forge_core_executors::actions::ExecutorActionType;
1486 let initial_executor_profile_id = match &latest.executor_action()?.typ {
1487 ExecutorActionType::CodingAgentInitialRequest(req) => req.executor_profile_id.clone(),
1488 ExecutorActionType::CodingAgentFollowUpRequest(req) => req.executor_profile_id.clone(),
1489 _ => {
1490 tracing::warn!(
1491 "Latest process for attempt {} is not a coding agent; skipping queued follow-up",
1492 ctx.task_attempt.id
1493 );
1494 return Ok(());
1495 }
1496 };
1497
1498 let executor_profile_id = forge_core_executors::profile::ExecutorProfileId {
1499 executor: initial_executor_profile_id.executor,
1500 variant: draft.variant.clone(),
1501 };
1502
1503 let cleanup_action = ctx
1505 .task
1506 .parent_project(&self.db.pool)
1507 .await?
1508 .and_then(|project| self.cleanup_action(project.cleanup_script));
1509
1510 let mut prompt = draft.prompt.clone();
1512 if let Some(image_ids) = &draft.image_ids {
1513 let _ = TaskImage::associate_many_dedup(&self.db.pool, ctx.task.id, image_ids).await;
1515
1516 let worktree_path = std::path::PathBuf::from(&container_ref);
1518 if let Err(e) = self
1519 .image_service
1520 .copy_images_by_ids_to_worktree(&worktree_path, image_ids)
1521 .await
1522 {
1523 tracing::warn!("Failed to copy images to worktree: {}", e);
1524 } else {
1525 prompt = ImageService::canonicalise_image_paths(&prompt, &worktree_path);
1526 }
1527 }
1528
1529 let follow_up_request =
1530 forge_core_executors::actions::coding_agent_follow_up::CodingAgentFollowUpRequest {
1531 prompt,
1532 session_id,
1533 executor_profile_id,
1534 };
1535
1536 let follow_up_action = forge_core_executors::actions::ExecutorAction::new(
1537 forge_core_executors::actions::ExecutorActionType::CodingAgentFollowUpRequest(
1538 follow_up_request,
1539 ),
1540 cleanup_action,
1541 );
1542
1543 let _ = self
1545 .start_execution(
1546 &ctx.task_attempt,
1547 &follow_up_action,
1548 &ExecutionProcessRunReason::CodingAgent,
1549 )
1550 .await?;
1551
1552 let _ =
1554 Draft::clear_after_send(&self.db.pool, ctx.task_attempt.id, DraftType::FollowUp).await;
1555
1556 Ok(())
1557 }
1558}
1559
1560fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str {
1561 if content.len() <= max_len {
1562 return content;
1563 }
1564
1565 let cutoff = content
1566 .char_indices()
1567 .map(|(idx, _)| idx)
1568 .chain(std::iter::once(content.len()))
1569 .take_while(|&idx| idx <= max_len)
1570 .last()
1571 .unwrap_or(0);
1572
1573 debug_assert!(content.is_char_boundary(cutoff));
1574 &content[..cutoff]
1575}
1576
1577#[cfg(test)]
1578mod tests {
1579
1580 #[test]
1581 fn test_truncate_to_char_boundary() {
1582 use super::truncate_to_char_boundary;
1583
1584 let input = "a".repeat(10);
1585 assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7));
1586
1587 let input = "hello world";
1588 assert_eq!(truncate_to_char_boundary(input, input.len()), input);
1589
1590 let input = "🔥🔥🔥"; assert_eq!(truncate_to_char_boundary(input, 5), "🔥");
1592 assert_eq!(truncate_to_char_boundary(input, 3), "");
1593 }
1594}