forge_core_deployment/
lib.rs

1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Error as AnyhowError;
4use async_trait::async_trait;
5use axum::response::sse::Event;
6use forge_core_db::{
7    DBService,
8    models::{
9        execution_process::{ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus},
10        project::{CreateProject, Project},
11        task::{Task, TaskStatus},
12        task_attempt::{TaskAttempt, TaskAttemptError},
13    },
14};
15use forge_core_executors::executors::ExecutorError;
16use forge_core_services::services::{
17    analytics::{AnalyticsContext, AnalyticsService},
18    approvals::Approvals,
19    auth::{AuthError, AuthService},
20    config::{Config, ConfigError},
21    container::{ContainerError, ContainerService},
22    drafts::DraftsService,
23    events::{EventError, EventService},
24    file_search_cache::FileSearchCache,
25    filesystem::{FilesystemError, FilesystemService},
26    filesystem_watcher::FilesystemWatcherError,
27    forge_config::ForgeConfigService,
28    git::{GitService, GitServiceError},
29    image::{ImageError, ImageService},
30    omni::OmniService,
31    pr_monitor::PrMonitorService,
32    profile_loader::ProfileCacheManager,
33    worktree_manager::WorktreeError,
34};
35use forge_core_utils::{msg_store::MsgStore, sentry as sentry_utils};
36use futures::{StreamExt, TryStreamExt};
37use git2::Error as Git2Error;
38use serde_json::Value;
39use sqlx::{Error as SqlxError, types::Uuid};
40use thiserror::Error;
41use tokio::sync::RwLock;
42
43#[derive(Debug, Error)]
44pub enum DeploymentError {
45    #[error(transparent)]
46    Io(#[from] std::io::Error),
47    #[error(transparent)]
48    Sqlx(#[from] SqlxError),
49    #[error(transparent)]
50    Git2(#[from] Git2Error),
51    #[error(transparent)]
52    GitServiceError(#[from] GitServiceError),
53    #[error(transparent)]
54    FilesystemWatcherError(#[from] FilesystemWatcherError),
55    #[error(transparent)]
56    TaskAttempt(#[from] TaskAttemptError),
57    #[error(transparent)]
58    Container(#[from] ContainerError),
59    #[error(transparent)]
60    Executor(#[from] ExecutorError),
61    #[error(transparent)]
62    Auth(#[from] AuthError),
63    #[error(transparent)]
64    Image(#[from] ImageError),
65    #[error(transparent)]
66    Filesystem(#[from] FilesystemError),
67    #[error(transparent)]
68    Worktree(#[from] WorktreeError),
69    #[error(transparent)]
70    Event(#[from] EventError),
71    #[error(transparent)]
72    Config(#[from] ConfigError),
73    #[error(transparent)]
74    Other(#[from] AnyhowError),
75}
76
77#[async_trait]
78pub trait Deployment: Clone + Send + Sync + 'static {
79    async fn new() -> Result<Self, DeploymentError>;
80
81    fn user_id(&self) -> &str;
82
83    fn shared_types() -> Vec<String>;
84
85    fn config(&self) -> &Arc<RwLock<Config>>;
86
87    fn db(&self) -> &DBService;
88
89    fn analytics(&self) -> &Option<AnalyticsService>;
90
91    fn container(&self) -> &impl ContainerService;
92
93    fn auth(&self) -> &AuthService;
94
95    fn git(&self) -> &GitService;
96
97    fn image(&self) -> &ImageService;
98
99    fn filesystem(&self) -> &FilesystemService;
100
101    fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>;
102
103    fn events(&self) -> &EventService;
104
105    fn file_search_cache(&self) -> &Arc<FileSearchCache>;
106
107    fn approvals(&self) -> &Approvals;
108
109    fn drafts(&self) -> &DraftsService;
110
111    fn forge_config(&self) -> &ForgeConfigService;
112
113    fn omni(&self) -> &Arc<RwLock<OmniService>>;
114
115    fn profile_cache(&self) -> &ProfileCacheManager;
116
117    async fn update_sentry_scope(&self) -> Result<(), DeploymentError> {
118        let user_id = self.user_id();
119        let config = self.config().read().await;
120        let username = config.github.username.as_deref();
121        let email = config.github.primary_email.as_deref();
122        sentry_utils::configure_user_scope(user_id, username, email);
123
124        Ok(())
125    }
126
127    async fn spawn_pr_monitor_service(&self) -> tokio::task::JoinHandle<()> {
128        let db = self.db().clone();
129        let config = self.config().clone();
130        let analytics = self
131            .analytics()
132            .as_ref()
133            .map(|analytics_service| AnalyticsContext {
134                user_id: self.user_id().to_string(),
135                analytics_service: analytics_service.clone(),
136            });
137        PrMonitorService::spawn(db, config, analytics).await
138    }
139
140    async fn track_if_analytics_allowed(&self, event_name: &str, properties: Value) {
141        let analytics_enabled = self.config().read().await.analytics_enabled;
142        // Only skip tracking if user explicitly opted out (Some(false))
143        // Send for None (undecided) and Some(true) (opted in)
144        if analytics_enabled != Some(false)
145            && let Some(analytics) = self.analytics()
146        {
147            analytics.track_event(self.user_id(), event_name, Some(properties.clone()));
148        }
149    }
150
151    /// Cleanup executions marked as running in the db, call at startup
152    async fn cleanup_orphan_executions(&self) -> Result<(), DeploymentError> {
153        let running_processes = ExecutionProcess::find_running(&self.db().pool).await?;
154        for process in running_processes {
155            tracing::info!(
156                "Found orphaned execution process {} for task attempt {:?}",
157                process.id,
158                process.task_attempt_id
159            );
160            // Update the execution process status first
161            if let Err(e) = ExecutionProcess::update_completion(
162                &self.db().pool,
163                process.id,
164                ExecutionProcessStatus::Failed,
165                None, // No exit code for orphaned processes
166            )
167            .await
168            {
169                tracing::error!(
170                    "Failed to update orphaned execution process {} status: {}",
171                    process.id,
172                    e
173                );
174                continue;
175            }
176            // Capture after-head commit OID (best-effort) - only for task-attempt-based processes
177            if let Some(attempt_id) = process.task_attempt_id
178                && let Ok(Some(task_attempt)) =
179                    TaskAttempt::find_by_id(&self.db().pool, attempt_id).await
180                && let Some(container_ref) = task_attempt.container_ref
181            {
182                let wt = std::path::PathBuf::from(container_ref);
183                if let Ok(head) = self.git().get_head_info(&wt) {
184                    let _ = ExecutionProcess::update_after_head_commit(
185                        &self.db().pool,
186                        process.id,
187                        &head.oid,
188                    )
189                    .await;
190                }
191            }
192            // Process marked as failed
193            tracing::info!("Marked orphaned execution process {} as failed", process.id);
194            // Update task status to InReview for coding agent and setup script failures
195            if matches!(
196                process.run_reason,
197                ExecutionProcessRunReason::CodingAgent
198                    | ExecutionProcessRunReason::SetupScript
199                    | ExecutionProcessRunReason::CleanupScript
200            ) && let Some(attempt_id) = process.task_attempt_id
201                && let Ok(Some(task_attempt)) =
202                    TaskAttempt::find_by_id(&self.db().pool, attempt_id).await
203                && let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await
204                && let Err(e) =
205                    Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await
206            {
207                tracing::error!(
208                    "Failed to update task status to InReview for orphaned attempt: {}",
209                    e
210                );
211            }
212        }
213        Ok(())
214    }
215
216    /// Backfill before_head_commit for legacy execution processes.
217    /// Rules:
218    /// - If a process has after_head_commit and missing before_head_commit,
219    ///   then set before_head_commit to the previous process's after_head_commit.
220    /// - If there is no previous process, set before_head_commit to the base branch commit.
221    async fn backfill_before_head_commits(&self) -> Result<(), DeploymentError> {
222        let pool = &self.db().pool;
223        let rows = ExecutionProcess::list_missing_before_context(pool).await?;
224        for row in rows {
225            // Skip if no after commit at all (shouldn't happen due to WHERE)
226            // Prefer previous process after-commit if present
227            let mut before = row.prev_after_head_commit.clone();
228
229            // Fallback to base branch commit OID
230            if before.is_none() {
231                let repo_path =
232                    std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default());
233                match self
234                    .git()
235                    .get_branch_oid(repo_path, row.target_branch.as_str())
236                {
237                    Ok(oid) => before = Some(oid),
238                    Err(e) => {
239                        tracing::warn!(
240                            "Backfill: Failed to resolve base branch OID for attempt {:?} (branch {}): {}",
241                            row.task_attempt_id,
242                            row.target_branch,
243                            e
244                        );
245                    }
246                }
247            }
248
249            if let Some(before_oid) = before
250                && let Err(e) =
251                    ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await
252            {
253                tracing::warn!(
254                    "Backfill: Failed to update before_head_commit for process {}: {}",
255                    row.id,
256                    e
257                );
258            }
259        }
260
261        Ok(())
262    }
263
264    /// Trigger background auto-setup of default projects for new users
265    async fn trigger_auto_project_setup(&self) {
266        // soft timeout to give the filesystem search a chance to complete
267        let soft_timeout_ms = 2_000;
268        // hard timeout to ensure the background task doesn't run indefinitely
269        let hard_timeout_ms = 2_300;
270        let project_count = Project::count(&self.db().pool).await.unwrap_or(0);
271
272        // Only proceed if no projects exist
273        if project_count == 0 {
274            // Discover local git repositories
275            if let Ok(repos) = self
276                .filesystem()
277                .list_common_git_repos(soft_timeout_ms, hard_timeout_ms, Some(4))
278                .await
279            {
280                // Take first 3 repositories and create projects
281                for repo in repos.into_iter().take(3) {
282                    // Generate clean project name from path
283                    let project_name = repo.name;
284
285                    let create_data = CreateProject {
286                        name: project_name,
287                        git_repo_path: repo.path.to_string_lossy().to_string(),
288                        use_existing_repo: true,
289                        setup_script: None,
290                        dev_script: None,
291                        cleanup_script: None,
292                        copy_files: None,
293                        commit_prompt: None,
294                    };
295                    // Ensure existing repo has a main branch if it's empty
296                    if let Err(e) = self.git().ensure_main_branch_exists(&repo.path) {
297                        tracing::error!("Failed to ensure main branch exists: {}", e);
298                        continue;
299                    }
300
301                    // Create project (ignore individual failures)
302                    let project_id = Uuid::new_v4();
303                    match Project::create(&self.db().pool, &create_data, project_id).await {
304                        Ok(project) => {
305                            tracing::info!(
306                                "Auto-created project '{}' from {}",
307                                create_data.name,
308                                create_data.git_repo_path
309                            );
310
311                            // Track project creation event
312                            self.track_if_analytics_allowed(
313                                "project_created",
314                                serde_json::json!({
315                                    "project_id": project.id.to_string(),
316                                    "use_existing_repo": create_data.use_existing_repo,
317                                    "has_setup_script": create_data.setup_script.is_some(),
318                                    "has_dev_script": create_data.dev_script.is_some(),
319                                    "trigger": "auto_setup",
320                                }),
321                            )
322                            .await;
323                        }
324                        Err(e) => {
325                            tracing::warn!(
326                                "Failed to auto-create project '{}': {}",
327                                create_data.name,
328                                e
329                            );
330                        }
331                    }
332                }
333            }
334        }
335    }
336
337    async fn stream_events(
338        &self,
339    ) -> futures::stream::BoxStream<'static, Result<Event, std::io::Error>> {
340        self.events()
341            .msg_store()
342            .history_plus_stream()
343            .map_ok(|m| m.to_sse_event())
344            .boxed()
345    }
346}