1use std::{
4 fs,
5 path::{Path, PathBuf},
6 sync::Arc,
7 time::Duration,
8};
9
10use tokio::{sync::Mutex, task::JoinHandle};
11use tokio_util::sync::CancellationToken;
12use uuid::Uuid;
13
14use crate::{
15 branch::{lifecycle::BranchLifecycle, store::BranchStore},
16 commit::{cherry::CherryPick, selective::SelectiveCommit, EntitySelection},
17 config::BranchConfig,
18 dag::graph::DagGraph,
19 diff::extractor::DiffExtractor,
20 error::{BranchError, BranchResult},
21 merge::{
22 resolver::ConflictResolver,
23 strategies::MergeStrategy,
24 three_way::{MergePreview, ThreeWayMerger},
25 },
26 metrics::{reporter::MetricsReporter, tracker::MetricsTracker},
27 sandbox::{
28 environment::{SimulationEnvironment, SimulationScenario},
29 evaluator::{EvaluationReport, SandboxEvaluator},
30 runner::SandboxRunner,
31 },
32 snapshot::{
33 copier::{cleanup_incomplete_tmp_files, SnapshotCopier},
34 gc::{GcReport, SnapshotGc},
35 },
36 types::{
37 Branch, BranchMetrics, BranchStatus, CommitResult, DiffResult, MergeResult, WorkspaceReport,
38 },
39};
40use thiserror::Error;
41
42#[derive(Debug, Error)]
44pub enum BranchConfigError {
45 #[error("missing required field: workspace_id")]
47 MissingWorkspaceId,
48 #[error("missing required field: branches_dir")]
50 MissingBranchesDir,
51 #[error("missing required field: trunk_source_db")]
53 MissingTrunkSourceDb,
54 #[error(transparent)]
56 Branch(#[from] BranchError),
57}
58
59#[derive(Debug, Default, Clone)]
61pub struct BranchEngineBuilder {
62 workspace_id: Option<Uuid>,
63 branches_dir: Option<PathBuf>,
64 trunk_source_db: Option<PathBuf>,
65 max_branches: Option<usize>,
66 gc_interval_secs: Option<u64>,
67}
68
69impl BranchEngineBuilder {
70 pub fn new() -> Self {
72 Self::default()
73 }
74
75 pub fn workspace_id(mut self, workspace_id: Uuid) -> Self {
77 self.workspace_id = Some(workspace_id);
78 self
79 }
80
81 pub fn branches_dir<P: Into<PathBuf>>(mut self, branches_dir: P) -> Self {
83 self.branches_dir = Some(branches_dir.into());
84 self
85 }
86
87 pub fn trunk_source_db<P: Into<PathBuf>>(mut self, trunk_source_db: P) -> Self {
89 self.trunk_source_db = Some(trunk_source_db.into());
90 self
91 }
92
93 pub fn max_branches(mut self, max_branches: usize) -> Self {
95 self.max_branches = Some(max_branches);
96 self
97 }
98
99 pub fn gc_interval_secs(mut self, gc_interval_secs: u64) -> Self {
101 self.gc_interval_secs = Some(gc_interval_secs);
102 self
103 }
104
105 pub async fn build(self) -> Result<BranchEngine, BranchConfigError> {
107 let workspace_id = self
108 .workspace_id
109 .ok_or(BranchConfigError::MissingWorkspaceId)?;
110 let branches_dir = self
111 .branches_dir
112 .ok_or(BranchConfigError::MissingBranchesDir)?;
113 let trunk_source_db = self
114 .trunk_source_db
115 .ok_or(BranchConfigError::MissingTrunkSourceDb)?;
116
117 let mut builder = BranchConfig::builder()
118 .workspace_id(workspace_id)
119 .branches_dir(branches_dir);
120 if let Some(max) = self.max_branches {
121 builder = builder.max_branches_per_workspace(max);
122 }
123 if let Some(gc_interval) = self.gc_interval_secs {
124 builder = builder.gc_interval_secs(gc_interval);
125 }
126
127 let config = builder.build().map_err(BranchConfigError::Branch)?;
128 BranchEngine::new(config, &trunk_source_db)
129 .await
130 .map_err(BranchConfigError::Branch)
131 }
132}
133
134#[derive(Clone)]
164pub struct BranchEngine {
165 config: Arc<BranchConfig>,
166 store: Arc<BranchStore>,
167 dag: Arc<DagGraph>,
168 lifecycle: Arc<BranchLifecycle>,
169 metrics: Arc<MetricsTracker>,
170 gc: SnapshotGc,
171 gc_scheduler: Arc<Mutex<Option<GcScheduler>>>,
172}
173
174struct GcScheduler {
175 cancellation_token: CancellationToken,
176 task_handle: JoinHandle<()>,
177}
178
179impl BranchEngine {
180 #[tracing::instrument(skip(config), fields(workspace_id = %config.workspace_id))]
186 pub async fn new(config: BranchConfig, trunk_db_path: &Path) -> BranchResult<Self> {
187 validate_branches_dir(&config)?;
188 cleanup_incomplete_tmp_files(&config.branches_dir).await?;
189 let config = Arc::new(config);
190 let store = Arc::new(BranchStore::new(&config.registry_db_path).await?);
191 let dag = Arc::new(DagGraph::new(Arc::clone(&config)));
192 let copier = Arc::new(SnapshotCopier::new(Arc::clone(&config)));
193 let lifecycle = Arc::new(BranchLifecycle::new(
194 Arc::clone(&store),
195 Arc::clone(&copier),
196 Arc::clone(&dag),
197 Arc::clone(&config),
198 ));
199 let metrics = Arc::new(MetricsTracker::new(Arc::clone(&store), Arc::clone(&config)));
200 let gc = SnapshotGc::new(Arc::clone(&config), Arc::clone(&store));
201
202 let engine = Self {
203 config,
204 store,
205 dag,
206 lifecycle,
207 metrics,
208 gc,
209 gc_scheduler: Arc::new(Mutex::new(None)),
210 };
211
212 if engine.trunk_opt().await?.is_none() {
214 engine
215 .lifecycle
216 .create_trunk(engine.config.workspace_id, trunk_db_path)
217 .await?;
218 }
219
220 Ok(engine)
221 }
222
223 #[tracing::instrument(skip(config), fields(workspace_id = %config.workspace_id))]
225 pub async fn open(config: BranchConfig) -> BranchResult<Self> {
226 validate_branches_dir(&config)?;
227 cleanup_incomplete_tmp_files(&config.branches_dir).await?;
228 let config = Arc::new(config);
229 let store = Arc::new(BranchStore::new(&config.registry_db_path).await?);
230 let dag = Arc::new(DagGraph::new(Arc::clone(&config)));
231 let copier = Arc::new(SnapshotCopier::new(Arc::clone(&config)));
232 let lifecycle = Arc::new(BranchLifecycle::new(
233 Arc::clone(&store),
234 Arc::clone(&copier),
235 Arc::clone(&dag),
236 Arc::clone(&config),
237 ));
238 let metrics = Arc::new(MetricsTracker::new(Arc::clone(&store), Arc::clone(&config)));
239 let gc = SnapshotGc::new(Arc::clone(&config), Arc::clone(&store));
240
241 Ok(Self {
242 config,
243 store,
244 dag,
245 lifecycle,
246 metrics,
247 gc,
248 gc_scheduler: Arc::new(Mutex::new(None)),
249 })
250 }
251
252 pub fn config(&self) -> &BranchConfig {
256 &self.config
257 }
258
259 pub fn store(&self) -> Arc<BranchStore> {
261 Arc::clone(&self.store)
262 }
263
264 pub fn dag(&self) -> Arc<DagGraph> {
266 Arc::clone(&self.dag)
267 }
268
269 pub fn lifecycle(&self) -> Arc<BranchLifecycle> {
271 Arc::clone(&self.lifecycle)
272 }
273
274 #[tracing::instrument(skip(self))]
278 pub async fn get(&self, id: Uuid) -> BranchResult<Branch> {
279 let branch = self.store.get(self.config.workspace_id, id).await?;
280 self.ensure_workspace_access(&branch)?;
281 Ok(branch)
282 }
283
284 #[tracing::instrument(skip(self))]
286 pub async fn get_by_name(&self, name: &str) -> BranchResult<Branch> {
287 self.store.get_by_name(self.config.workspace_id, name).await
288 }
289
290 #[tracing::instrument(skip(self))]
292 pub async fn list(&self, status: Option<BranchStatus>) -> BranchResult<Vec<Branch>> {
293 let branches = self.store.list(self.config.workspace_id, status).await?;
294 for branch in &branches {
295 self.ensure_workspace_access(branch)?;
296 }
297 Ok(branches)
298 }
299
300 #[tracing::instrument(skip(self))]
302 pub async fn trunk(&self) -> BranchResult<Branch> {
303 self.trunk_opt()
304 .await?
305 .ok_or_else(|| BranchError::NamingError("trunk branch not found".to_string()))
306 }
307
308 #[tracing::instrument(skip(self))]
310 pub async fn fork(
311 &self,
312 parent_id: Uuid,
313 name: &str,
314 description: Option<&str>,
315 ) -> BranchResult<Branch> {
316 let parent = self.store.get(self.config.workspace_id, parent_id).await?;
317 self.ensure_workspace_access(&parent)?;
318 self.lifecycle.fork(parent_id, name, description).await
319 }
320
321 #[tracing::instrument(skip(self))]
323 pub async fn fork_trunk(&self, name: &str) -> BranchResult<Branch> {
324 crate::branch::naming::NamingValidator::validate(name)?;
325 let trunk = self.trunk().await?;
326 self.lifecycle.fork(trunk.id, name, None).await
327 }
328
329 #[tracing::instrument(skip(self))]
331 pub async fn discard(&self, id: Uuid) -> BranchResult<()> {
332 let branch = self.store.get(self.config.workspace_id, id).await?;
333 self.ensure_workspace_access(&branch)?;
334 self.lifecycle.discard(id).await
335 }
336
337 #[tracing::instrument(skip(self))]
339 pub async fn archive(&self, id: Uuid) -> BranchResult<()> {
340 let branch = self.store.get(self.config.workspace_id, id).await?;
341 self.ensure_workspace_access(&branch)?;
342 self.lifecycle.archive(id).await
343 }
344
345 #[tracing::instrument(skip(self))]
349 pub async fn diff(&self, a: Uuid, b: Uuid) -> BranchResult<DiffResult> {
350 let branch_a = self.store.get(self.config.workspace_id, a).await?;
351 let branch_b = self.store.get(self.config.workspace_id, b).await?;
352 self.ensure_workspace_access(&branch_a)?;
353 self.ensure_workspace_access(&branch_b)?;
354 let extractor = DiffExtractor::new(Arc::clone(&self.config));
355 extractor.diff(&branch_a, &branch_b, None).await
356 }
357
358 #[tracing::instrument(skip(self))]
360 pub async fn compare_branches(&self, a: Uuid, b: Uuid) -> BranchResult<DiffResult> {
361 self.diff(a, b).await
362 }
363
364 #[tracing::instrument(skip(self))]
371 pub async fn merge(
372 &self,
373 source: Uuid,
374 target: Uuid,
375 strategy: MergeStrategy,
376 ) -> BranchResult<MergeResult> {
377 let source_branch = self.store.get(self.config.workspace_id, source).await?;
378 let target_branch = self.store.get(self.config.workspace_id, target).await?;
379 self.ensure_workspace_access(&source_branch)?;
380 self.ensure_workspace_access(&target_branch)?;
381 let base_id = source_branch.parent_id.unwrap_or(target);
382 let base_branch = self.store.get(self.config.workspace_id, base_id).await?;
383 self.ensure_workspace_access(&base_branch)?;
384 let resolver = Arc::new(ConflictResolver);
385 let merger = ThreeWayMerger::new(resolver, Arc::clone(&self.config));
386 merger
387 .merge(
388 &base_branch,
389 &source_branch,
390 &target_branch,
391 &strategy,
392 None,
393 )
394 .await
395 }
396
397 #[tracing::instrument(skip(self))]
399 pub async fn merge_preview(&self, source: Uuid, target: Uuid) -> BranchResult<MergePreview> {
400 let source_branch = self.store.get(self.config.workspace_id, source).await?;
401 let target_branch = self.store.get(self.config.workspace_id, target).await?;
402 self.ensure_workspace_access(&source_branch)?;
403 self.ensure_workspace_access(&target_branch)?;
404 let base_id = source_branch.parent_id.unwrap_or(target);
405 let base_branch = self.store.get(self.config.workspace_id, base_id).await?;
406 self.ensure_workspace_access(&base_branch)?;
407 let resolver = Arc::new(ConflictResolver);
408 let merger = ThreeWayMerger::new(resolver, Arc::clone(&self.config));
409 merger
410 .preview(&base_branch, &source_branch, &target_branch, None)
411 .await
412 }
413
414 #[tracing::instrument(skip(self))]
418 pub async fn commit(&self, cherry: CherryPick) -> BranchResult<CommitResult> {
419 let source = self
420 .store
421 .get(self.config.workspace_id, cherry.source_branch_id)
422 .await?;
423 let target = self
424 .store
425 .get(self.config.workspace_id, cherry.target_branch_id)
426 .await?;
427 self.ensure_workspace_access(&source)?;
428 self.ensure_workspace_access(&target)?;
429 let committer = SelectiveCommit::from_store(
430 Arc::clone(&self.store),
431 cherry.source_branch_id,
432 cherry.target_branch_id,
433 self.config.workspace_id,
434 )
435 .await?;
436 committer.commit(&cherry).await
437 }
438
439 #[tracing::instrument(skip(self, selections))]
441 pub async fn cherry_pick(
442 &self,
443 source_id: Uuid,
444 target_id: Uuid,
445 selections: Vec<EntitySelection>,
446 message: Option<String>,
447 ) -> BranchResult<CommitResult> {
448 self.commit(CherryPick {
449 source_branch_id: source_id,
450 target_branch_id: target_id,
451 entity_selections: selections,
452 message,
453 })
454 .await
455 }
456
457 #[tracing::instrument(skip(self))]
459 pub async fn commit_to_trunk(&self, source_id: Uuid) -> BranchResult<CommitResult> {
460 let source = self.store.get(self.config.workspace_id, source_id).await?;
461 self.ensure_workspace_access(&source)?;
462 let trunk = self.trunk().await?;
463 let committer = SelectiveCommit::from_store(
464 Arc::clone(&self.store),
465 source_id,
466 trunk.id,
467 self.config.workspace_id,
468 )
469 .await?;
470 committer.commit_all(source_id, trunk.id).await
471 }
472
473 #[tracing::instrument(skip(self, agent_fn))]
479 pub async fn simulate<F, Fut>(
480 &self,
481 parent_id: Uuid,
482 scenario: SimulationScenario,
483 agent_fn: F,
484 ) -> BranchResult<EvaluationReport>
485 where
486 F: FnOnce(sqlx::SqlitePool) -> Fut,
487 Fut: std::future::Future<Output = BranchResult<serde_json::Value>>,
488 {
489 let parent = self.store.get(self.config.workspace_id, parent_id).await?;
490 self.ensure_workspace_access(&parent)?;
491 let env = SimulationEnvironment::setup(
492 &parent,
493 scenario,
494 Arc::clone(&self.config),
495 Arc::clone(&self.lifecycle),
496 )
497 .await?;
498
499 let mut runner = SandboxRunner::new(env, Arc::clone(&self.config));
500 let _ = runner.run(agent_fn).await?;
501
502 let evaluator = SandboxEvaluator;
503 evaluator
504 .evaluate(&runner.env, &parent, Arc::clone(&self.config))
505 .await
506 }
507
508 #[tracing::instrument(skip(self))]
512 pub async fn lineage(&self, branch_id: Uuid) -> BranchResult<Vec<Uuid>> {
513 let mut ancestors = self.dag.ancestors_of(branch_id)?;
514 ancestors.reverse();
515 ancestors.push(branch_id);
516 Ok(ancestors)
517 }
518
519 #[tracing::instrument(skip(self))]
521 pub async fn dag_dot(&self) -> BranchResult<String> {
522 crate::dag::dot::export_dot(&self.dag, &self.store).await
523 }
524
525 #[tracing::instrument(skip(self))]
529 pub async fn metrics(&self, branch_id: Uuid) -> BranchResult<BranchMetrics> {
530 let branch = self.store.get(self.config.workspace_id, branch_id).await?;
531 self.metrics.refresh(&branch).await
532 }
533
534 #[tracing::instrument(skip(self))]
536 pub async fn workspace_report(&self) -> BranchResult<WorkspaceReport> {
537 MetricsReporter
538 .workspace_report(self.config.workspace_id, &self.store)
539 .await
540 }
541
542 #[tracing::instrument(skip(self))]
546 pub async fn gc(&self) -> BranchResult<GcReport> {
547 self.gc.run().await
548 }
549
550 #[tracing::instrument(skip(self))]
552 pub async fn start_gc_scheduler(&self) -> BranchResult<()> {
553 let mut guard = self.gc_scheduler.lock().await;
554 if guard.is_some() {
555 return Ok(());
556 }
557
558 let cancellation_token = CancellationToken::new();
559 let child_token = cancellation_token.clone();
560 let engine = self.clone();
561 let interval_seconds = self.config.gc_interval_secs.max(1);
562
563 let task_handle = tokio::spawn(async move {
564 let mut interval = tokio::time::interval(Duration::from_secs(interval_seconds));
565 loop {
566 tokio::select! {
567 _ = child_token.cancelled() => break,
568 _ = interval.tick() => {
569 let _ = engine.gc().await;
570 }
571 }
572 }
573 });
574
575 *guard = Some(GcScheduler {
576 cancellation_token,
577 task_handle,
578 });
579 Ok(())
580 }
581
582 #[tracing::instrument(skip(self))]
584 pub async fn stop_gc_scheduler(&self) -> BranchResult<()> {
585 let scheduler = {
586 let mut guard = self.gc_scheduler.lock().await;
587 guard.take()
588 };
589
590 if let Some(scheduler) = scheduler {
591 scheduler.cancellation_token.cancel();
592 scheduler
593 .task_handle
594 .await
595 .map_err(|error| BranchError::SandboxError(error.to_string()))?;
596 }
597
598 Ok(())
599 }
600
601 #[tracing::instrument(skip(self))]
603 pub async fn shutdown(&self) -> BranchResult<()> {
604 self.stop_gc_scheduler().await
605 }
606
607 async fn trunk_opt(&self) -> BranchResult<Option<Branch>> {
610 match self
611 .store
612 .get_by_slug(self.config.workspace_id, &self.config.trunk_branch_name)
613 .await
614 {
615 Ok(b) => Ok(Some(b)),
616 Err(BranchError::BranchNotFound(_)) => Ok(None),
617 Err(BranchError::BranchAlreadyExists(name))
620 if name == self.config.trunk_branch_name =>
621 {
622 Ok(None)
623 }
624 Err(e) => Err(e),
625 }
626 }
627
628 fn ensure_workspace_access(&self, branch: &Branch) -> BranchResult<()> {
629 if branch.workspace_id != self.config.workspace_id {
630 return Err(BranchError::WorkspaceIsolationViolation {
631 expected: self.config.workspace_id,
632 found: branch.workspace_id,
633 });
634 }
635 Ok(())
636 }
637}
638
639fn validate_branches_dir(config: &BranchConfig) -> BranchResult<()> {
640 fs::create_dir_all(&config.branches_dir)?;
641 let expected_root = fs::canonicalize(
642 config
643 .branches_dir
644 .parent()
645 .unwrap_or_else(|| Path::new(".")),
646 )?;
647 let resolved_dir = fs::canonicalize(&config.branches_dir)?;
648
649 if !resolved_dir.is_absolute() || !resolved_dir.starts_with(&expected_root) {
650 return Err(BranchError::InvalidConfig(
651 "branches_dir resolves outside expected root".to_string(),
652 ));
653 }
654
655 let mut cursor = resolved_dir.as_path();
656 while cursor.starts_with(&expected_root) && cursor != expected_root {
657 let meta = fs::symlink_metadata(cursor)?;
658 if meta.file_type().is_symlink() {
659 let link_target = fs::canonicalize(cursor)?;
660 if !link_target.starts_with(&expected_root) {
661 return Err(BranchError::InvalidConfig(
662 "branches_dir resolves outside expected root".to_string(),
663 ));
664 }
665 }
666 if let Some(parent) = cursor.parent() {
667 cursor = parent;
668 } else {
669 break;
670 }
671 }
672
673 Ok(())
674}