1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::Error as AnyhowError;
4use async_trait::async_trait;
5use axum::response::sse::Event;
6use forge_core_db::{
7 DBService,
8 models::{
9 execution_process::{ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus},
10 project::{CreateProject, Project},
11 task::{Task, TaskStatus},
12 task_attempt::{TaskAttempt, TaskAttemptError},
13 },
14};
15use forge_core_executors::executors::ExecutorError;
16use forge_core_services::services::{
17 analytics::{AnalyticsContext, AnalyticsService},
18 approvals::Approvals,
19 auth::{AuthError, AuthService},
20 config::{Config, ConfigError},
21 container::{ContainerError, ContainerService},
22 drafts::DraftsService,
23 events::{EventError, EventService},
24 file_search_cache::FileSearchCache,
25 filesystem::{FilesystemError, FilesystemService},
26 filesystem_watcher::FilesystemWatcherError,
27 forge_config::ForgeConfigService,
28 git::{GitService, GitServiceError},
29 image::{ImageError, ImageService},
30 omni::OmniService,
31 pr_monitor::PrMonitorService,
32 profile_loader::ProfileCacheManager,
33 worktree_manager::WorktreeError,
34};
35use forge_core_utils::{msg_store::MsgStore, sentry as sentry_utils};
36use futures::{StreamExt, TryStreamExt};
37use git2::Error as Git2Error;
38use serde_json::Value;
39use sqlx::{Error as SqlxError, types::Uuid};
40use thiserror::Error;
41use tokio::sync::RwLock;
42
43#[derive(Debug, Error)]
44pub enum DeploymentError {
45 #[error(transparent)]
46 Io(#[from] std::io::Error),
47 #[error(transparent)]
48 Sqlx(#[from] SqlxError),
49 #[error(transparent)]
50 Git2(#[from] Git2Error),
51 #[error(transparent)]
52 GitServiceError(#[from] GitServiceError),
53 #[error(transparent)]
54 FilesystemWatcherError(#[from] FilesystemWatcherError),
55 #[error(transparent)]
56 TaskAttempt(#[from] TaskAttemptError),
57 #[error(transparent)]
58 Container(#[from] ContainerError),
59 #[error(transparent)]
60 Executor(#[from] ExecutorError),
61 #[error(transparent)]
62 Auth(#[from] AuthError),
63 #[error(transparent)]
64 Image(#[from] ImageError),
65 #[error(transparent)]
66 Filesystem(#[from] FilesystemError),
67 #[error(transparent)]
68 Worktree(#[from] WorktreeError),
69 #[error(transparent)]
70 Event(#[from] EventError),
71 #[error(transparent)]
72 Config(#[from] ConfigError),
73 #[error(transparent)]
74 Other(#[from] AnyhowError),
75}
76
77#[async_trait]
78pub trait Deployment: Clone + Send + Sync + 'static {
79 async fn new() -> Result<Self, DeploymentError>;
80
81 fn user_id(&self) -> &str;
82
83 fn shared_types() -> Vec<String>;
84
85 fn config(&self) -> &Arc<RwLock<Config>>;
86
87 fn db(&self) -> &DBService;
88
89 fn analytics(&self) -> &Option<AnalyticsService>;
90
91 fn container(&self) -> &impl ContainerService;
92
93 fn auth(&self) -> &AuthService;
94
95 fn git(&self) -> &GitService;
96
97 fn image(&self) -> &ImageService;
98
99 fn filesystem(&self) -> &FilesystemService;
100
101 fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>;
102
103 fn events(&self) -> &EventService;
104
105 fn file_search_cache(&self) -> &Arc<FileSearchCache>;
106
107 fn approvals(&self) -> &Approvals;
108
109 fn drafts(&self) -> &DraftsService;
110
111 fn forge_config(&self) -> &ForgeConfigService;
112
113 fn omni(&self) -> &Arc<RwLock<OmniService>>;
114
115 fn profile_cache(&self) -> &ProfileCacheManager;
116
117 async fn update_sentry_scope(&self) -> Result<(), DeploymentError> {
118 let user_id = self.user_id();
119 let config = self.config().read().await;
120 let username = config.github.username.as_deref();
121 let email = config.github.primary_email.as_deref();
122 sentry_utils::configure_user_scope(user_id, username, email);
123
124 Ok(())
125 }
126
127 async fn spawn_pr_monitor_service(&self) -> tokio::task::JoinHandle<()> {
128 let db = self.db().clone();
129 let config = self.config().clone();
130 let analytics = self
131 .analytics()
132 .as_ref()
133 .map(|analytics_service| AnalyticsContext {
134 user_id: self.user_id().to_string(),
135 analytics_service: analytics_service.clone(),
136 });
137 PrMonitorService::spawn(db, config, analytics).await
138 }
139
140 async fn track_if_analytics_allowed(&self, event_name: &str, properties: Value) {
141 let analytics_enabled = self.config().read().await.analytics_enabled;
142 if analytics_enabled != Some(false)
145 && let Some(analytics) = self.analytics()
146 {
147 analytics.track_event(self.user_id(), event_name, Some(properties.clone()));
148 }
149 }
150
151 async fn cleanup_orphan_executions(&self) -> Result<(), DeploymentError> {
153 let running_processes = ExecutionProcess::find_running(&self.db().pool).await?;
154 for process in running_processes {
155 tracing::info!(
156 "Found orphaned execution process {} for task attempt {:?}",
157 process.id,
158 process.task_attempt_id
159 );
160 if let Err(e) = ExecutionProcess::update_completion(
162 &self.db().pool,
163 process.id,
164 ExecutionProcessStatus::Failed,
165 None, )
167 .await
168 {
169 tracing::error!(
170 "Failed to update orphaned execution process {} status: {}",
171 process.id,
172 e
173 );
174 continue;
175 }
176 if let Some(attempt_id) = process.task_attempt_id
178 && let Ok(Some(task_attempt)) =
179 TaskAttempt::find_by_id(&self.db().pool, attempt_id).await
180 && let Some(container_ref) = task_attempt.container_ref
181 {
182 let wt = std::path::PathBuf::from(container_ref);
183 if let Ok(head) = self.git().get_head_info(&wt) {
184 let _ = ExecutionProcess::update_after_head_commit(
185 &self.db().pool,
186 process.id,
187 &head.oid,
188 )
189 .await;
190 }
191 }
192 tracing::info!("Marked orphaned execution process {} as failed", process.id);
194 if matches!(
196 process.run_reason,
197 ExecutionProcessRunReason::CodingAgent
198 | ExecutionProcessRunReason::SetupScript
199 | ExecutionProcessRunReason::CleanupScript
200 ) && let Some(attempt_id) = process.task_attempt_id
201 && let Ok(Some(task_attempt)) =
202 TaskAttempt::find_by_id(&self.db().pool, attempt_id).await
203 && let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await
204 && let Err(e) =
205 Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await
206 {
207 tracing::error!(
208 "Failed to update task status to InReview for orphaned attempt: {}",
209 e
210 );
211 }
212 }
213 Ok(())
214 }
215
216 async fn backfill_before_head_commits(&self) -> Result<(), DeploymentError> {
222 let pool = &self.db().pool;
223 let rows = ExecutionProcess::list_missing_before_context(pool).await?;
224 for row in rows {
225 let mut before = row.prev_after_head_commit.clone();
228
229 if before.is_none() {
231 let repo_path =
232 std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default());
233 match self
234 .git()
235 .get_branch_oid(repo_path, row.target_branch.as_str())
236 {
237 Ok(oid) => before = Some(oid),
238 Err(e) => {
239 tracing::warn!(
240 "Backfill: Failed to resolve base branch OID for attempt {:?} (branch {}): {}",
241 row.task_attempt_id,
242 row.target_branch,
243 e
244 );
245 }
246 }
247 }
248
249 if let Some(before_oid) = before
250 && let Err(e) =
251 ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await
252 {
253 tracing::warn!(
254 "Backfill: Failed to update before_head_commit for process {}: {}",
255 row.id,
256 e
257 );
258 }
259 }
260
261 Ok(())
262 }
263
264 async fn trigger_auto_project_setup(&self) {
266 let soft_timeout_ms = 2_000;
268 let hard_timeout_ms = 2_300;
270 let project_count = Project::count(&self.db().pool).await.unwrap_or(0);
271
272 if project_count == 0 {
274 if let Ok(repos) = self
276 .filesystem()
277 .list_common_git_repos(soft_timeout_ms, hard_timeout_ms, Some(4))
278 .await
279 {
280 for repo in repos.into_iter().take(3) {
282 let project_name = repo.name;
284
285 let create_data = CreateProject {
286 name: project_name,
287 git_repo_path: repo.path.to_string_lossy().to_string(),
288 use_existing_repo: true,
289 setup_script: None,
290 dev_script: None,
291 cleanup_script: None,
292 copy_files: None,
293 commit_prompt: None,
294 };
295 if let Err(e) = self.git().ensure_main_branch_exists(&repo.path) {
297 tracing::error!("Failed to ensure main branch exists: {}", e);
298 continue;
299 }
300
301 let project_id = Uuid::new_v4();
303 match Project::create(&self.db().pool, &create_data, project_id).await {
304 Ok(project) => {
305 tracing::info!(
306 "Auto-created project '{}' from {}",
307 create_data.name,
308 create_data.git_repo_path
309 );
310
311 self.track_if_analytics_allowed(
313 "project_created",
314 serde_json::json!({
315 "project_id": project.id.to_string(),
316 "use_existing_repo": create_data.use_existing_repo,
317 "has_setup_script": create_data.setup_script.is_some(),
318 "has_dev_script": create_data.dev_script.is_some(),
319 "trigger": "auto_setup",
320 }),
321 )
322 .await;
323 }
324 Err(e) => {
325 tracing::warn!(
326 "Failed to auto-create project '{}': {}",
327 create_data.name,
328 e
329 );
330 }
331 }
332 }
333 }
334 }
335 }
336
337 async fn stream_events(
338 &self,
339 ) -> futures::stream::BoxStream<'static, Result<Event, std::io::Error>> {
340 self.events()
341 .msg_store()
342 .history_plus_stream()
343 .map_ok(|m| m.to_sse_event())
344 .boxed()
345 }
346}