1use super::events::{Event, EventPayload};
6use super::flow::{FlowState, RetryMode, TaskExecState, TaskExecution, TaskFlow};
7use super::graph::{GraphState, TaskGraph};
8use super::scope::{RepoAccessMode, Scope};
9use super::verification::CheckResult;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use uuid::Uuid;
14
15const fn default_max_parallel_tasks() -> u16 {
16 1
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum AttemptCheckpointState {
22 Declared,
23 Active,
24 Completed,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct AttemptCheckpoint {
29 pub checkpoint_id: String,
30 pub order: u32,
31 pub total: u32,
32 pub state: AttemptCheckpointState,
33 #[serde(default)]
34 pub commit_hash: Option<String>,
35 #[serde(default)]
36 pub completed_at: Option<DateTime<Utc>>,
37 #[serde(default)]
38 pub summary: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct AttemptState {
43 pub id: Uuid,
44 pub flow_id: Uuid,
45 pub task_id: Uuid,
46 pub attempt_number: u32,
47 pub started_at: DateTime<Utc>,
48 pub baseline_id: Option<Uuid>,
49 pub diff_id: Option<Uuid>,
50 #[serde(default)]
51 pub check_results: Vec<CheckResult>,
52 #[serde(default)]
53 pub checkpoints: Vec<AttemptCheckpoint>,
54 #[serde(default)]
55 pub all_checkpoints_completed: bool,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct ProjectRuntimeConfig {
60 pub adapter_name: String,
61 pub binary_path: String,
62 #[serde(default)]
63 pub model: Option<String>,
64 #[serde(default)]
65 pub args: Vec<String>,
66 #[serde(default)]
67 pub env: HashMap<String, String>,
68 pub timeout_ms: u64,
69 #[serde(default = "default_max_parallel_tasks")]
70 pub max_parallel_tasks: u16,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct TaskRuntimeConfig {
75 pub adapter_name: String,
76 pub binary_path: String,
77 #[serde(default)]
78 pub model: Option<String>,
79 #[serde(default)]
80 pub args: Vec<String>,
81 #[serde(default)]
82 pub env: HashMap<String, String>,
83 pub timeout_ms: u64,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct Project {
89 pub id: Uuid,
90 pub name: String,
91 pub description: Option<String>,
92 pub created_at: DateTime<Utc>,
93 pub updated_at: DateTime<Utc>,
94 pub repositories: Vec<Repository>,
95 #[serde(default)]
96 pub runtime: Option<ProjectRuntimeConfig>,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101pub struct Repository {
102 pub name: String,
103 pub path: String,
104 #[serde(default)]
105 pub access_mode: RepoAccessMode,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum TaskState {
112 Open,
113 Closed,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
118pub struct Task {
119 pub id: Uuid,
120 pub project_id: Uuid,
121 pub title: String,
122 pub description: Option<String>,
123 #[serde(default)]
124 pub scope: Option<Scope>,
125 #[serde(default)]
126 pub runtime_override: Option<TaskRuntimeConfig>,
127 pub state: TaskState,
128 pub created_at: DateTime<Utc>,
129 pub updated_at: DateTime<Utc>,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134#[serde(rename_all = "lowercase")]
135pub enum MergeStatus {
136 Prepared,
137 Approved,
138 Completed,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct MergeState {
144 pub flow_id: Uuid,
145 pub status: MergeStatus,
146 pub target_branch: Option<String>,
147 pub conflicts: Vec<String>,
148 pub commits: Vec<String>,
149 pub updated_at: DateTime<Utc>,
150}
151
152#[derive(Debug, Default, Clone)]
154pub struct AppState {
155 pub projects: HashMap<Uuid, Project>,
156 pub tasks: HashMap<Uuid, Task>,
157 pub graphs: HashMap<Uuid, TaskGraph>,
158 pub flows: HashMap<Uuid, TaskFlow>,
159 pub merge_states: HashMap<Uuid, MergeState>,
160 pub attempts: HashMap<Uuid, AttemptState>,
161}
162
163impl AppState {
164 #[must_use]
166 pub fn new() -> Self {
167 Self::default()
168 }
169
170 #[must_use]
172 pub fn apply(mut self, event: &Event) -> Self {
173 self.apply_mut(event);
174 self
175 }
176
177 #[allow(clippy::too_many_lines)]
179 pub fn apply_mut(&mut self, event: &Event) {
180 let timestamp = event.timestamp();
181
182 match &event.payload {
183 EventPayload::ProjectCreated {
184 id,
185 name,
186 description,
187 } => {
188 self.projects.insert(
189 *id,
190 Project {
191 id: *id,
192 name: name.clone(),
193 description: description.clone(),
194 created_at: timestamp,
195 updated_at: timestamp,
196 repositories: Vec::new(),
197 runtime: None,
198 },
199 );
200 }
201 EventPayload::ProjectUpdated {
202 id,
203 name,
204 description,
205 } => {
206 if let Some(project) = self.projects.get_mut(id) {
207 if let Some(n) = name {
208 n.clone_into(&mut project.name);
209 }
210 if let Some(d) = description {
211 project.description = Some(d.clone());
212 }
213 project.updated_at = timestamp;
214 }
215 }
216
217 EventPayload::TaskExecutionFrozen {
218 flow_id,
219 task_id,
220 commit_sha,
221 } => {
222 if let Some(flow) = self.flows.get_mut(flow_id) {
223 if let Some(exec) = flow.task_executions.get_mut(task_id) {
224 exec.frozen_commit_sha.clone_from(commit_sha);
225 exec.updated_at = timestamp;
226 flow.updated_at = timestamp;
227 }
228 }
229 }
230
231 EventPayload::TaskIntegratedIntoFlow {
232 flow_id,
233 task_id,
234 commit_sha,
235 } => {
236 if let Some(flow) = self.flows.get_mut(flow_id) {
237 if let Some(exec) = flow.task_executions.get_mut(task_id) {
238 exec.integrated_commit_sha.clone_from(commit_sha);
239 exec.updated_at = timestamp;
240 flow.updated_at = timestamp;
241 }
242 }
243 }
244 EventPayload::ProjectRuntimeConfigured {
245 project_id,
246 adapter_name,
247 binary_path,
248 model,
249 args,
250 env,
251 timeout_ms,
252 max_parallel_tasks,
253 } => {
254 if let Some(project) = self.projects.get_mut(project_id) {
255 project.runtime = Some(ProjectRuntimeConfig {
256 adapter_name: adapter_name.clone(),
257 binary_path: binary_path.clone(),
258 model: model.clone(),
259 args: args.clone(),
260 env: env.clone(),
261 timeout_ms: *timeout_ms,
262 max_parallel_tasks: *max_parallel_tasks,
263 });
264 project.updated_at = timestamp;
265 }
266 }
267 EventPayload::TaskCreated {
268 id,
269 project_id,
270 title,
271 description,
272 scope,
273 } => {
274 self.tasks.insert(
275 *id,
276 Task {
277 id: *id,
278 project_id: *project_id,
279 title: title.clone(),
280 description: description.clone(),
281 scope: scope.clone(),
282 runtime_override: None,
283 state: TaskState::Open,
284 created_at: timestamp,
285 updated_at: timestamp,
286 },
287 );
288 }
289 EventPayload::TaskUpdated {
290 id,
291 title,
292 description,
293 } => {
294 if let Some(task) = self.tasks.get_mut(id) {
295 if let Some(t) = title {
296 t.clone_into(&mut task.title);
297 }
298 if let Some(d) = description {
299 task.description = Some(d.clone());
300 }
301 task.updated_at = timestamp;
302 }
303 }
304 EventPayload::TaskRuntimeConfigured {
305 task_id,
306 adapter_name,
307 binary_path,
308 model,
309 args,
310 env,
311 timeout_ms,
312 } => {
313 if let Some(task) = self.tasks.get_mut(task_id) {
314 task.runtime_override = Some(TaskRuntimeConfig {
315 adapter_name: adapter_name.clone(),
316 binary_path: binary_path.clone(),
317 model: model.clone(),
318 args: args.clone(),
319 env: env.clone(),
320 timeout_ms: *timeout_ms,
321 });
322 task.updated_at = timestamp;
323 }
324 }
325 EventPayload::TaskRuntimeCleared { task_id } => {
326 if let Some(task) = self.tasks.get_mut(task_id) {
327 task.runtime_override = None;
328 task.updated_at = timestamp;
329 }
330 }
331 EventPayload::TaskClosed { id, reason: _ } => {
332 if let Some(task) = self.tasks.get_mut(id) {
333 task.state = TaskState::Closed;
334 task.updated_at = timestamp;
335 }
336 }
337 EventPayload::RepositoryAttached {
338 project_id,
339 path,
340 name,
341 access_mode,
342 } => {
343 if let Some(project) = self.projects.get_mut(project_id) {
344 project.repositories.push(Repository {
345 name: name.clone(),
346 path: path.clone(),
347 access_mode: *access_mode,
348 });
349 project.updated_at = timestamp;
350 }
351 }
352 EventPayload::RepositoryDetached { project_id, name } => {
353 if let Some(project) = self.projects.get_mut(project_id) {
354 project.repositories.retain(|r| r.name != *name);
355 project.updated_at = timestamp;
356 }
357 }
358
359 EventPayload::TaskGraphCreated {
360 graph_id,
361 project_id,
362 name,
363 description,
364 } => {
365 self.graphs.insert(
366 *graph_id,
367 TaskGraph {
368 id: *graph_id,
369 project_id: *project_id,
370 name: name.clone(),
371 description: description.clone(),
372 state: GraphState::Draft,
373 tasks: HashMap::new(),
374 dependencies: HashMap::<Uuid, HashSet<Uuid>>::new(),
375 created_at: timestamp,
376 updated_at: timestamp,
377 },
378 );
379 }
380 EventPayload::TaskAddedToGraph { graph_id, task } => {
381 if let Some(graph) = self.graphs.get_mut(graph_id) {
382 graph.tasks.insert(task.id, task.clone());
383 graph.dependencies.entry(task.id).or_default();
384 graph.updated_at = timestamp;
385 }
386 }
387 EventPayload::DependencyAdded {
388 graph_id,
389 from_task,
390 to_task,
391 } => {
392 if let Some(graph) = self.graphs.get_mut(graph_id) {
393 graph
394 .dependencies
395 .entry(*to_task)
396 .or_default()
397 .insert(*from_task);
398 graph.updated_at = timestamp;
399 }
400 }
401 EventPayload::GraphTaskCheckAdded {
402 graph_id,
403 task_id,
404 check,
405 } => {
406 if let Some(graph) = self.graphs.get_mut(graph_id) {
407 if let Some(task) = graph.tasks.get_mut(task_id) {
408 task.criteria.checks.push(check.clone());
409 graph.updated_at = timestamp;
410 }
411 }
412 }
413 EventPayload::ScopeAssigned {
414 graph_id,
415 task_id,
416 scope,
417 } => {
418 if let Some(graph) = self.graphs.get_mut(graph_id) {
419 if let Some(task) = graph.tasks.get_mut(task_id) {
420 task.scope = Some(scope.clone());
421 graph.updated_at = timestamp;
422 }
423 }
424 }
425
426 EventPayload::TaskGraphValidated {
427 graph_id,
428 project_id: _,
429 valid,
430 issues: _,
431 } => {
432 if *valid {
433 if let Some(graph) = self.graphs.get_mut(graph_id) {
434 graph.state = GraphState::Validated;
435 graph.updated_at = timestamp;
436 }
437 }
438 }
439
440 EventPayload::TaskGraphLocked {
441 graph_id,
442 project_id: _,
443 } => {
444 if let Some(graph) = self.graphs.get_mut(graph_id) {
445 graph.state = GraphState::Locked;
446 graph.updated_at = timestamp;
447 }
448 }
449 EventPayload::TaskFlowCreated {
450 flow_id,
451 graph_id,
452 project_id,
453 name: _,
454 task_ids,
455 } => {
456 if let Some(graph) = self.graphs.get_mut(graph_id) {
457 graph.state = GraphState::Locked;
458 graph.updated_at = timestamp;
459 }
460
461 let mut task_executions = HashMap::new();
462 for task_id in task_ids {
463 task_executions.insert(
464 *task_id,
465 TaskExecution {
466 task_id: *task_id,
467 state: TaskExecState::Pending,
468 attempt_count: 0,
469 retry_mode: RetryMode::default(),
470 frozen_commit_sha: None,
471 integrated_commit_sha: None,
472 updated_at: timestamp,
473 blocked_reason: None,
474 },
475 );
476 }
477
478 self.flows.insert(
479 *flow_id,
480 TaskFlow {
481 id: *flow_id,
482 graph_id: *graph_id,
483 project_id: *project_id,
484 base_revision: None,
485 state: FlowState::Created,
486 task_executions,
487 created_at: timestamp,
488 started_at: None,
489 completed_at: None,
490 updated_at: timestamp,
491 },
492 );
493 }
494 EventPayload::TaskFlowStarted {
495 flow_id,
496 base_revision,
497 } => {
498 if let Some(flow) = self.flows.get_mut(flow_id) {
499 flow.state = FlowState::Running;
500 flow.started_at = Some(timestamp);
501 flow.base_revision.clone_from(base_revision);
502 flow.updated_at = timestamp;
503 }
504 }
505 EventPayload::TaskFlowPaused {
506 flow_id,
507 running_tasks: _,
508 } => {
509 if let Some(flow) = self.flows.get_mut(flow_id) {
510 flow.state = FlowState::Paused;
511 flow.updated_at = timestamp;
512 }
513 }
514 EventPayload::TaskFlowResumed { flow_id } => {
515 if let Some(flow) = self.flows.get_mut(flow_id) {
516 flow.state = FlowState::Running;
517 flow.updated_at = timestamp;
518 }
519 }
520 EventPayload::TaskFlowCompleted { flow_id } => {
521 if let Some(flow) = self.flows.get_mut(flow_id) {
522 flow.state = FlowState::Completed;
523 flow.completed_at = Some(timestamp);
524 flow.updated_at = timestamp;
525 }
526 }
527
528 EventPayload::FlowFrozenForMerge { flow_id } => {
529 if let Some(flow) = self.flows.get_mut(flow_id) {
530 flow.state = FlowState::FrozenForMerge;
531 flow.updated_at = timestamp;
532 }
533 }
534 EventPayload::TaskFlowAborted {
535 flow_id,
536 reason: _,
537 forced: _,
538 } => {
539 if let Some(flow) = self.flows.get_mut(flow_id) {
540 flow.state = FlowState::Aborted;
541 flow.completed_at = Some(timestamp);
542 flow.updated_at = timestamp;
543 }
544 }
545 EventPayload::TaskReady { flow_id, task_id } => {
546 if let Some(flow) = self.flows.get_mut(flow_id) {
547 if let Some(exec) = flow.task_executions.get_mut(task_id) {
548 exec.state = TaskExecState::Ready;
549 exec.blocked_reason = None;
550 exec.updated_at = timestamp;
551 }
552 flow.updated_at = timestamp;
553 }
554 }
555 EventPayload::TaskBlocked {
556 flow_id,
557 task_id,
558 reason,
559 } => {
560 if let Some(flow) = self.flows.get_mut(flow_id) {
561 if let Some(exec) = flow.task_executions.get_mut(task_id) {
562 exec.state = TaskExecState::Pending;
563 exec.blocked_reason.clone_from(reason);
564 exec.updated_at = timestamp;
565 }
566 flow.updated_at = timestamp;
567 }
568 }
569 EventPayload::TaskExecutionStateChanged {
570 flow_id,
571 task_id,
572 from: _,
573 to,
574 } => {
575 if let Some(flow) = self.flows.get_mut(flow_id) {
576 if let Some(exec) = flow.task_executions.get_mut(task_id) {
577 exec.state = *to;
578 exec.updated_at = timestamp;
579 exec.blocked_reason = None;
580 if *to == TaskExecState::Running {
581 exec.attempt_count += 1;
582 }
583 }
584 flow.updated_at = timestamp;
585 }
586 }
587
588 EventPayload::AttemptStarted {
589 flow_id,
590 task_id,
591 attempt_id,
592 attempt_number,
593 } => {
594 self.attempts.insert(
595 *attempt_id,
596 AttemptState {
597 id: *attempt_id,
598 flow_id: *flow_id,
599 task_id: *task_id,
600 attempt_number: *attempt_number,
601 started_at: timestamp,
602 baseline_id: None,
603 diff_id: None,
604 check_results: Vec::new(),
605 checkpoints: Vec::new(),
606 all_checkpoints_completed: false,
607 },
608 );
609 }
610
611 EventPayload::CheckpointDeclared {
612 attempt_id,
613 checkpoint_id,
614 order,
615 total,
616 ..
617 } => {
618 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
619 let exists = attempt
620 .checkpoints
621 .iter()
622 .any(|cp| cp.checkpoint_id == *checkpoint_id);
623 if !exists {
624 attempt.checkpoints.push(AttemptCheckpoint {
625 checkpoint_id: checkpoint_id.clone(),
626 order: *order,
627 total: *total,
628 state: AttemptCheckpointState::Declared,
629 commit_hash: None,
630 completed_at: None,
631 summary: None,
632 });
633 attempt.checkpoints.sort_by_key(|cp| cp.order);
634 }
635 }
636 }
637
638 EventPayload::CheckpointActivated {
639 attempt_id,
640 checkpoint_id,
641 ..
642 } => {
643 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
644 for cp in &mut attempt.checkpoints {
645 if cp.checkpoint_id == *checkpoint_id {
646 cp.state = AttemptCheckpointState::Active;
647 } else if cp.state != AttemptCheckpointState::Completed {
648 cp.state = AttemptCheckpointState::Declared;
649 }
650 }
651 }
652 }
653
654 EventPayload::CheckpointCompleted {
655 attempt_id,
656 checkpoint_id,
657 commit_hash,
658 timestamp,
659 summary,
660 ..
661 } => {
662 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
663 if let Some(cp) = attempt
664 .checkpoints
665 .iter_mut()
666 .find(|cp| cp.checkpoint_id == *checkpoint_id)
667 {
668 cp.state = AttemptCheckpointState::Completed;
669 cp.commit_hash = Some(commit_hash.clone());
670 cp.completed_at = Some(*timestamp);
671 cp.summary.clone_from(summary);
672 }
673 }
674 }
675
676 EventPayload::AllCheckpointsCompleted { attempt_id, .. } => {
677 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
678 attempt.all_checkpoints_completed = true;
679 }
680 }
681
682 EventPayload::BaselineCaptured {
683 attempt_id,
684 baseline_id,
685 ..
686 } => {
687 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
688 attempt.baseline_id = Some(*baseline_id);
689 }
690 }
691
692 EventPayload::DiffComputed {
693 attempt_id,
694 diff_id,
695 ..
696 } => {
697 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
698 attempt.diff_id = Some(*diff_id);
699 }
700 }
701
702 EventPayload::CheckCompleted {
703 attempt_id,
704 check_name,
705 passed,
706 exit_code,
707 output,
708 duration_ms,
709 required,
710 ..
711 } => {
712 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
713 attempt.check_results.push(CheckResult {
714 name: check_name.clone(),
715 passed: *passed,
716 exit_code: *exit_code,
717 output: output.clone(),
718 duration_ms: *duration_ms,
719 required: *required,
720 });
721 }
722 }
723
724 EventPayload::CheckStarted { .. }
725 | EventPayload::ErrorOccurred { .. }
726 | EventPayload::TaskExecutionStarted { .. }
727 | EventPayload::TaskExecutionSucceeded { .. }
728 | EventPayload::TaskExecutionFailed { .. }
729 | EventPayload::MergeConflictDetected { .. }
730 | EventPayload::MergeCheckStarted { .. }
731 | EventPayload::MergeCheckCompleted { .. }
732 | EventPayload::RuntimeStarted { .. }
733 | EventPayload::RuntimeOutputChunk { .. }
734 | EventPayload::RuntimeInputProvided { .. }
735 | EventPayload::RuntimeInterrupted { .. }
736 | EventPayload::RuntimeExited { .. }
737 | EventPayload::RuntimeTerminated { .. }
738 | EventPayload::RuntimeFilesystemObserved { .. }
739 | EventPayload::RuntimeCommandObserved { .. }
740 | EventPayload::RuntimeToolCallObserved { .. }
741 | EventPayload::RuntimeTodoSnapshotUpdated { .. }
742 | EventPayload::RuntimeNarrativeOutputObserved { .. }
743 | EventPayload::FileModified { .. }
744 | EventPayload::CheckpointCommitCreated { .. }
745 | EventPayload::ScopeValidated { .. }
746 | EventPayload::ScopeViolationDetected { .. }
747 | EventPayload::ScopeConflictDetected { .. }
748 | EventPayload::TaskSchedulingDeferred { .. }
749 | EventPayload::RetryContextAssembled { .. }
750 | EventPayload::FlowIntegrationLockAcquired { .. }
751 | EventPayload::Unknown => {}
752
753 EventPayload::TaskRetryRequested {
754 task_id,
755 reset_count,
756 retry_mode,
757 } => {
758 let flow_id = event.metadata.correlation.flow_id;
759 let mut candidate_flow_ids = Vec::new();
760
761 if let Some(fid) = flow_id {
762 candidate_flow_ids.push(fid);
763 } else {
764 for (fid, flow) in &self.flows {
765 if flow.task_executions.contains_key(task_id) {
766 candidate_flow_ids.push(*fid);
767 }
768 }
769 }
770
771 for fid in candidate_flow_ids {
772 if let Some(flow) = self.flows.get_mut(&fid) {
773 if let Some(exec) = flow.task_executions.get_mut(task_id) {
774 exec.state = TaskExecState::Pending;
775 exec.blocked_reason = None;
776 exec.updated_at = timestamp;
777 exec.retry_mode = *retry_mode;
778 if *reset_count {
779 exec.attempt_count = 0;
780 }
781 flow.updated_at = timestamp;
782 break;
783 }
784 }
785 }
786 }
787
788 EventPayload::TaskAborted { task_id, reason: _ } => {
789 let flow_id = event.metadata.correlation.flow_id;
790 let mut candidate_flow_ids = Vec::new();
791
792 if let Some(fid) = flow_id {
793 candidate_flow_ids.push(fid);
794 } else {
795 for (fid, flow) in &self.flows {
796 if flow.task_executions.contains_key(task_id) {
797 candidate_flow_ids.push(*fid);
798 }
799 }
800 }
801
802 for fid in candidate_flow_ids {
803 if let Some(flow) = self.flows.get_mut(&fid) {
804 if let Some(exec) = flow.task_executions.get_mut(task_id) {
805 exec.state = TaskExecState::Failed;
806 exec.blocked_reason = None;
807 exec.updated_at = timestamp;
808 flow.updated_at = timestamp;
809 break;
810 }
811 }
812 }
813 }
814
815 EventPayload::HumanOverride {
816 task_id,
817 override_type: _,
818 decision,
819 reason: _,
820 user: _,
821 } => {
822 let flow_id = event.metadata.correlation.flow_id;
823 let mut candidate_flow_ids = Vec::new();
824
825 if let Some(fid) = flow_id {
826 candidate_flow_ids.push(fid);
827 } else {
828 for (fid, flow) in &self.flows {
829 if flow.task_executions.contains_key(task_id) {
830 candidate_flow_ids.push(*fid);
831 }
832 }
833 }
834
835 let new_state = if decision == "pass" {
836 TaskExecState::Success
837 } else {
838 TaskExecState::Failed
839 };
840
841 for fid in candidate_flow_ids {
842 if let Some(flow) = self.flows.get_mut(&fid) {
843 if let Some(exec) = flow.task_executions.get_mut(task_id) {
844 exec.state = new_state;
845 exec.blocked_reason = None;
846 exec.updated_at = timestamp;
847 flow.updated_at = timestamp;
848 break;
849 }
850 }
851 }
852 }
853
854 EventPayload::MergePrepared {
855 flow_id,
856 target_branch,
857 conflicts,
858 } => {
859 self.merge_states.insert(
860 *flow_id,
861 MergeState {
862 flow_id: *flow_id,
863 status: MergeStatus::Prepared,
864 target_branch: target_branch.clone(),
865 conflicts: conflicts.clone(),
866 commits: Vec::new(),
867 updated_at: timestamp,
868 },
869 );
870 }
871 EventPayload::MergeApproved { flow_id, user: _ } => {
872 if let Some(ms) = self.merge_states.get_mut(flow_id) {
873 ms.status = MergeStatus::Approved;
874 ms.updated_at = timestamp;
875 }
876 }
877 EventPayload::MergeCompleted { flow_id, commits } => {
878 if let Some(ms) = self.merge_states.get_mut(flow_id) {
879 ms.status = MergeStatus::Completed;
880 commits.clone_into(&mut ms.commits);
881 ms.updated_at = timestamp;
882 }
883
884 if let Some(flow) = self.flows.get_mut(flow_id) {
885 flow.state = FlowState::Merged;
886 flow.completed_at = Some(timestamp);
887 flow.updated_at = timestamp;
888 }
889 }
890 }
891 }
892
893 #[must_use]
896 pub fn replay(events: &[Event]) -> Self {
897 let mut state = Self::new();
898 for event in events {
899 state.apply_mut(event);
900 }
901 state
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908 use crate::core::events::CorrelationIds;
909
910 #[test]
911 fn replay_is_deterministic() {
912 let project_id = Uuid::new_v4();
913 let events = vec![
914 Event::new(
915 EventPayload::ProjectCreated {
916 id: project_id,
917 name: "test".to_string(),
918 description: None,
919 },
920 CorrelationIds::for_project(project_id),
921 ),
922 Event::new(
923 EventPayload::ProjectUpdated {
924 id: project_id,
925 name: Some("updated".to_string()),
926 description: None,
927 },
928 CorrelationIds::for_project(project_id),
929 ),
930 ];
931
932 let state1 = AppState::replay(&events);
933 let state2 = AppState::replay(&events);
934
935 assert_eq!(state1.projects.len(), state2.projects.len());
936 assert_eq!(
937 state1.projects.get(&project_id).unwrap().name,
938 state2.projects.get(&project_id).unwrap().name
939 );
940 }
941
942 #[test]
943 fn replay_is_idempotent() {
944 let project_id = Uuid::new_v4();
945 let events = vec![Event::new(
946 EventPayload::ProjectCreated {
947 id: project_id,
948 name: "test".to_string(),
949 description: None,
950 },
951 CorrelationIds::for_project(project_id),
952 )];
953
954 let state1 = AppState::replay(&events);
955 let state2 = AppState::replay(&events);
956
957 assert_eq!(state1.projects.len(), 1);
958 assert_eq!(state2.projects.len(), 1);
959 }
960
961 #[test]
962 fn task_lifecycle() {
963 let project_id = Uuid::new_v4();
964 let task_id = Uuid::new_v4();
965
966 let events = vec![
967 Event::new(
968 EventPayload::ProjectCreated {
969 id: project_id,
970 name: "proj".to_string(),
971 description: None,
972 },
973 CorrelationIds::for_project(project_id),
974 ),
975 Event::new(
976 EventPayload::TaskCreated {
977 id: task_id,
978 project_id,
979 title: "task1".to_string(),
980 description: None,
981 scope: None,
982 },
983 CorrelationIds::for_task(project_id, task_id),
984 ),
985 Event::new(
986 EventPayload::TaskClosed {
987 id: task_id,
988 reason: None,
989 },
990 CorrelationIds::for_task(project_id, task_id),
991 ),
992 ];
993
994 let state = AppState::replay(&events);
995 let task = state.tasks.get(&task_id).unwrap();
996
997 assert_eq!(task.state, TaskState::Closed);
998 }
999}