Skip to main content

hivemind/core/
state.rs

1//! State derived from events.
2//!
3//! All state in Hivemind is derived by replaying events. This ensures
4//! determinism, idempotency, and complete observability.
5use super::events::{Event, EventPayload};
6use super::flow::{FlowState, RetryMode, TaskExecState, TaskExecution, TaskFlow};
7use super::graph::{GraphState, TaskGraph};
8use super::scope::{RepoAccessMode, Scope};
9use super::verification::CheckResult;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use uuid::Uuid;
14
15const fn default_max_parallel_tasks() -> u16 {
16    1
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum AttemptCheckpointState {
22    Declared,
23    Active,
24    Completed,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct AttemptCheckpoint {
29    pub checkpoint_id: String,
30    pub order: u32,
31    pub total: u32,
32    pub state: AttemptCheckpointState,
33    #[serde(default)]
34    pub commit_hash: Option<String>,
35    #[serde(default)]
36    pub completed_at: Option<DateTime<Utc>>,
37    #[serde(default)]
38    pub summary: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct AttemptState {
43    pub id: Uuid,
44    pub flow_id: Uuid,
45    pub task_id: Uuid,
46    pub attempt_number: u32,
47    pub started_at: DateTime<Utc>,
48    pub baseline_id: Option<Uuid>,
49    pub diff_id: Option<Uuid>,
50    #[serde(default)]
51    pub check_results: Vec<CheckResult>,
52    #[serde(default)]
53    pub checkpoints: Vec<AttemptCheckpoint>,
54    #[serde(default)]
55    pub all_checkpoints_completed: bool,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct ProjectRuntimeConfig {
60    pub adapter_name: String,
61    pub binary_path: String,
62    #[serde(default)]
63    pub model: Option<String>,
64    #[serde(default)]
65    pub args: Vec<String>,
66    #[serde(default)]
67    pub env: HashMap<String, String>,
68    pub timeout_ms: u64,
69    #[serde(default = "default_max_parallel_tasks")]
70    pub max_parallel_tasks: u16,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct TaskRuntimeConfig {
75    pub adapter_name: String,
76    pub binary_path: String,
77    #[serde(default)]
78    pub model: Option<String>,
79    #[serde(default)]
80    pub args: Vec<String>,
81    #[serde(default)]
82    pub env: HashMap<String, String>,
83    pub timeout_ms: u64,
84}
85
86/// A project in the system.
87#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
88pub struct Project {
89    pub id: Uuid,
90    pub name: String,
91    pub description: Option<String>,
92    pub created_at: DateTime<Utc>,
93    pub updated_at: DateTime<Utc>,
94    pub repositories: Vec<Repository>,
95    #[serde(default)]
96    pub runtime: Option<ProjectRuntimeConfig>,
97}
98
99/// A repository attached to a project.
100#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101pub struct Repository {
102    pub name: String,
103    pub path: String,
104    #[serde(default)]
105    pub access_mode: RepoAccessMode,
106}
107
108/// Task state.
109#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum TaskState {
112    Open,
113    Closed,
114}
115
116/// A task in the system.
117#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
118pub struct Task {
119    pub id: Uuid,
120    pub project_id: Uuid,
121    pub title: String,
122    pub description: Option<String>,
123    #[serde(default)]
124    pub scope: Option<Scope>,
125    #[serde(default)]
126    pub runtime_override: Option<TaskRuntimeConfig>,
127    pub state: TaskState,
128    pub created_at: DateTime<Utc>,
129    pub updated_at: DateTime<Utc>,
130}
131
132/// Merge workflow status.
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134#[serde(rename_all = "lowercase")]
135pub enum MergeStatus {
136    Prepared,
137    Approved,
138    Completed,
139}
140
141/// Merge state for a flow.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct MergeState {
144    pub flow_id: Uuid,
145    pub status: MergeStatus,
146    pub target_branch: Option<String>,
147    pub conflicts: Vec<String>,
148    pub commits: Vec<String>,
149    pub updated_at: DateTime<Utc>,
150}
151
152/// The complete application state derived from events.
153#[derive(Debug, Default, Clone)]
154pub struct AppState {
155    pub projects: HashMap<Uuid, Project>,
156    pub tasks: HashMap<Uuid, Task>,
157    pub graphs: HashMap<Uuid, TaskGraph>,
158    pub flows: HashMap<Uuid, TaskFlow>,
159    pub merge_states: HashMap<Uuid, MergeState>,
160    pub attempts: HashMap<Uuid, AttemptState>,
161}
162
163impl AppState {
164    /// Creates a new empty state.
165    #[must_use]
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    /// Applies an event to the state, returning a new state.
171    #[must_use]
172    pub fn apply(mut self, event: &Event) -> Self {
173        self.apply_mut(event);
174        self
175    }
176
177    /// Applies an event to the state in place.
178    #[allow(clippy::too_many_lines)]
179    pub fn apply_mut(&mut self, event: &Event) {
180        let timestamp = event.timestamp();
181
182        match &event.payload {
183            EventPayload::ProjectCreated {
184                id,
185                name,
186                description,
187            } => {
188                self.projects.insert(
189                    *id,
190                    Project {
191                        id: *id,
192                        name: name.clone(),
193                        description: description.clone(),
194                        created_at: timestamp,
195                        updated_at: timestamp,
196                        repositories: Vec::new(),
197                        runtime: None,
198                    },
199                );
200            }
201            EventPayload::ProjectUpdated {
202                id,
203                name,
204                description,
205            } => {
206                if let Some(project) = self.projects.get_mut(id) {
207                    if let Some(n) = name {
208                        n.clone_into(&mut project.name);
209                    }
210                    if let Some(d) = description {
211                        project.description = Some(d.clone());
212                    }
213                    project.updated_at = timestamp;
214                }
215            }
216
217            EventPayload::TaskExecutionFrozen {
218                flow_id,
219                task_id,
220                commit_sha,
221            } => {
222                if let Some(flow) = self.flows.get_mut(flow_id) {
223                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
224                        exec.frozen_commit_sha.clone_from(commit_sha);
225                        exec.updated_at = timestamp;
226                        flow.updated_at = timestamp;
227                    }
228                }
229            }
230
231            EventPayload::TaskIntegratedIntoFlow {
232                flow_id,
233                task_id,
234                commit_sha,
235            } => {
236                if let Some(flow) = self.flows.get_mut(flow_id) {
237                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
238                        exec.integrated_commit_sha.clone_from(commit_sha);
239                        exec.updated_at = timestamp;
240                        flow.updated_at = timestamp;
241                    }
242                }
243            }
244            EventPayload::ProjectRuntimeConfigured {
245                project_id,
246                adapter_name,
247                binary_path,
248                model,
249                args,
250                env,
251                timeout_ms,
252                max_parallel_tasks,
253            } => {
254                if let Some(project) = self.projects.get_mut(project_id) {
255                    project.runtime = Some(ProjectRuntimeConfig {
256                        adapter_name: adapter_name.clone(),
257                        binary_path: binary_path.clone(),
258                        model: model.clone(),
259                        args: args.clone(),
260                        env: env.clone(),
261                        timeout_ms: *timeout_ms,
262                        max_parallel_tasks: *max_parallel_tasks,
263                    });
264                    project.updated_at = timestamp;
265                }
266            }
267            EventPayload::TaskCreated {
268                id,
269                project_id,
270                title,
271                description,
272                scope,
273            } => {
274                self.tasks.insert(
275                    *id,
276                    Task {
277                        id: *id,
278                        project_id: *project_id,
279                        title: title.clone(),
280                        description: description.clone(),
281                        scope: scope.clone(),
282                        runtime_override: None,
283                        state: TaskState::Open,
284                        created_at: timestamp,
285                        updated_at: timestamp,
286                    },
287                );
288            }
289            EventPayload::TaskUpdated {
290                id,
291                title,
292                description,
293            } => {
294                if let Some(task) = self.tasks.get_mut(id) {
295                    if let Some(t) = title {
296                        t.clone_into(&mut task.title);
297                    }
298                    if let Some(d) = description {
299                        task.description = Some(d.clone());
300                    }
301                    task.updated_at = timestamp;
302                }
303            }
304            EventPayload::TaskRuntimeConfigured {
305                task_id,
306                adapter_name,
307                binary_path,
308                model,
309                args,
310                env,
311                timeout_ms,
312            } => {
313                if let Some(task) = self.tasks.get_mut(task_id) {
314                    task.runtime_override = Some(TaskRuntimeConfig {
315                        adapter_name: adapter_name.clone(),
316                        binary_path: binary_path.clone(),
317                        model: model.clone(),
318                        args: args.clone(),
319                        env: env.clone(),
320                        timeout_ms: *timeout_ms,
321                    });
322                    task.updated_at = timestamp;
323                }
324            }
325            EventPayload::TaskRuntimeCleared { task_id } => {
326                if let Some(task) = self.tasks.get_mut(task_id) {
327                    task.runtime_override = None;
328                    task.updated_at = timestamp;
329                }
330            }
331            EventPayload::TaskClosed { id, reason: _ } => {
332                if let Some(task) = self.tasks.get_mut(id) {
333                    task.state = TaskState::Closed;
334                    task.updated_at = timestamp;
335                }
336            }
337            EventPayload::RepositoryAttached {
338                project_id,
339                path,
340                name,
341                access_mode,
342            } => {
343                if let Some(project) = self.projects.get_mut(project_id) {
344                    project.repositories.push(Repository {
345                        name: name.clone(),
346                        path: path.clone(),
347                        access_mode: *access_mode,
348                    });
349                    project.updated_at = timestamp;
350                }
351            }
352            EventPayload::RepositoryDetached { project_id, name } => {
353                if let Some(project) = self.projects.get_mut(project_id) {
354                    project.repositories.retain(|r| r.name != *name);
355                    project.updated_at = timestamp;
356                }
357            }
358
359            EventPayload::TaskGraphCreated {
360                graph_id,
361                project_id,
362                name,
363                description,
364            } => {
365                self.graphs.insert(
366                    *graph_id,
367                    TaskGraph {
368                        id: *graph_id,
369                        project_id: *project_id,
370                        name: name.clone(),
371                        description: description.clone(),
372                        state: GraphState::Draft,
373                        tasks: HashMap::new(),
374                        dependencies: HashMap::<Uuid, HashSet<Uuid>>::new(),
375                        created_at: timestamp,
376                        updated_at: timestamp,
377                    },
378                );
379            }
380            EventPayload::TaskAddedToGraph { graph_id, task } => {
381                if let Some(graph) = self.graphs.get_mut(graph_id) {
382                    graph.tasks.insert(task.id, task.clone());
383                    graph.dependencies.entry(task.id).or_default();
384                    graph.updated_at = timestamp;
385                }
386            }
387            EventPayload::DependencyAdded {
388                graph_id,
389                from_task,
390                to_task,
391            } => {
392                if let Some(graph) = self.graphs.get_mut(graph_id) {
393                    graph
394                        .dependencies
395                        .entry(*to_task)
396                        .or_default()
397                        .insert(*from_task);
398                    graph.updated_at = timestamp;
399                }
400            }
401            EventPayload::GraphTaskCheckAdded {
402                graph_id,
403                task_id,
404                check,
405            } => {
406                if let Some(graph) = self.graphs.get_mut(graph_id) {
407                    if let Some(task) = graph.tasks.get_mut(task_id) {
408                        task.criteria.checks.push(check.clone());
409                        graph.updated_at = timestamp;
410                    }
411                }
412            }
413            EventPayload::ScopeAssigned {
414                graph_id,
415                task_id,
416                scope,
417            } => {
418                if let Some(graph) = self.graphs.get_mut(graph_id) {
419                    if let Some(task) = graph.tasks.get_mut(task_id) {
420                        task.scope = Some(scope.clone());
421                        graph.updated_at = timestamp;
422                    }
423                }
424            }
425
426            EventPayload::TaskGraphValidated {
427                graph_id,
428                project_id: _,
429                valid,
430                issues: _,
431            } => {
432                if *valid {
433                    if let Some(graph) = self.graphs.get_mut(graph_id) {
434                        graph.state = GraphState::Validated;
435                        graph.updated_at = timestamp;
436                    }
437                }
438            }
439
440            EventPayload::TaskGraphLocked {
441                graph_id,
442                project_id: _,
443            } => {
444                if let Some(graph) = self.graphs.get_mut(graph_id) {
445                    graph.state = GraphState::Locked;
446                    graph.updated_at = timestamp;
447                }
448            }
449            EventPayload::TaskFlowCreated {
450                flow_id,
451                graph_id,
452                project_id,
453                name: _,
454                task_ids,
455            } => {
456                if let Some(graph) = self.graphs.get_mut(graph_id) {
457                    graph.state = GraphState::Locked;
458                    graph.updated_at = timestamp;
459                }
460
461                let mut task_executions = HashMap::new();
462                for task_id in task_ids {
463                    task_executions.insert(
464                        *task_id,
465                        TaskExecution {
466                            task_id: *task_id,
467                            state: TaskExecState::Pending,
468                            attempt_count: 0,
469                            retry_mode: RetryMode::default(),
470                            frozen_commit_sha: None,
471                            integrated_commit_sha: None,
472                            updated_at: timestamp,
473                            blocked_reason: None,
474                        },
475                    );
476                }
477
478                self.flows.insert(
479                    *flow_id,
480                    TaskFlow {
481                        id: *flow_id,
482                        graph_id: *graph_id,
483                        project_id: *project_id,
484                        base_revision: None,
485                        state: FlowState::Created,
486                        task_executions,
487                        created_at: timestamp,
488                        started_at: None,
489                        completed_at: None,
490                        updated_at: timestamp,
491                    },
492                );
493            }
494            EventPayload::TaskFlowStarted {
495                flow_id,
496                base_revision,
497            } => {
498                if let Some(flow) = self.flows.get_mut(flow_id) {
499                    flow.state = FlowState::Running;
500                    flow.started_at = Some(timestamp);
501                    flow.base_revision.clone_from(base_revision);
502                    flow.updated_at = timestamp;
503                }
504            }
505            EventPayload::TaskFlowPaused {
506                flow_id,
507                running_tasks: _,
508            } => {
509                if let Some(flow) = self.flows.get_mut(flow_id) {
510                    flow.state = FlowState::Paused;
511                    flow.updated_at = timestamp;
512                }
513            }
514            EventPayload::TaskFlowResumed { flow_id } => {
515                if let Some(flow) = self.flows.get_mut(flow_id) {
516                    flow.state = FlowState::Running;
517                    flow.updated_at = timestamp;
518                }
519            }
520            EventPayload::TaskFlowCompleted { flow_id } => {
521                if let Some(flow) = self.flows.get_mut(flow_id) {
522                    flow.state = FlowState::Completed;
523                    flow.completed_at = Some(timestamp);
524                    flow.updated_at = timestamp;
525                }
526            }
527
528            EventPayload::FlowFrozenForMerge { flow_id } => {
529                if let Some(flow) = self.flows.get_mut(flow_id) {
530                    flow.state = FlowState::FrozenForMerge;
531                    flow.updated_at = timestamp;
532                }
533            }
534            EventPayload::TaskFlowAborted {
535                flow_id,
536                reason: _,
537                forced: _,
538            } => {
539                if let Some(flow) = self.flows.get_mut(flow_id) {
540                    flow.state = FlowState::Aborted;
541                    flow.completed_at = Some(timestamp);
542                    flow.updated_at = timestamp;
543                }
544            }
545            EventPayload::TaskReady { flow_id, task_id } => {
546                if let Some(flow) = self.flows.get_mut(flow_id) {
547                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
548                        exec.state = TaskExecState::Ready;
549                        exec.blocked_reason = None;
550                        exec.updated_at = timestamp;
551                    }
552                    flow.updated_at = timestamp;
553                }
554            }
555            EventPayload::TaskBlocked {
556                flow_id,
557                task_id,
558                reason,
559            } => {
560                if let Some(flow) = self.flows.get_mut(flow_id) {
561                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
562                        exec.state = TaskExecState::Pending;
563                        exec.blocked_reason.clone_from(reason);
564                        exec.updated_at = timestamp;
565                    }
566                    flow.updated_at = timestamp;
567                }
568            }
569            EventPayload::TaskExecutionStateChanged {
570                flow_id,
571                task_id,
572                from: _,
573                to,
574            } => {
575                if let Some(flow) = self.flows.get_mut(flow_id) {
576                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
577                        exec.state = *to;
578                        exec.updated_at = timestamp;
579                        exec.blocked_reason = None;
580                        if *to == TaskExecState::Running {
581                            exec.attempt_count += 1;
582                        }
583                    }
584                    flow.updated_at = timestamp;
585                }
586            }
587
588            EventPayload::AttemptStarted {
589                flow_id,
590                task_id,
591                attempt_id,
592                attempt_number,
593            } => {
594                self.attempts.insert(
595                    *attempt_id,
596                    AttemptState {
597                        id: *attempt_id,
598                        flow_id: *flow_id,
599                        task_id: *task_id,
600                        attempt_number: *attempt_number,
601                        started_at: timestamp,
602                        baseline_id: None,
603                        diff_id: None,
604                        check_results: Vec::new(),
605                        checkpoints: Vec::new(),
606                        all_checkpoints_completed: false,
607                    },
608                );
609            }
610
611            EventPayload::CheckpointDeclared {
612                attempt_id,
613                checkpoint_id,
614                order,
615                total,
616                ..
617            } => {
618                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
619                    let exists = attempt
620                        .checkpoints
621                        .iter()
622                        .any(|cp| cp.checkpoint_id == *checkpoint_id);
623                    if !exists {
624                        attempt.checkpoints.push(AttemptCheckpoint {
625                            checkpoint_id: checkpoint_id.clone(),
626                            order: *order,
627                            total: *total,
628                            state: AttemptCheckpointState::Declared,
629                            commit_hash: None,
630                            completed_at: None,
631                            summary: None,
632                        });
633                        attempt.checkpoints.sort_by_key(|cp| cp.order);
634                    }
635                }
636            }
637
638            EventPayload::CheckpointActivated {
639                attempt_id,
640                checkpoint_id,
641                ..
642            } => {
643                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
644                    for cp in &mut attempt.checkpoints {
645                        if cp.checkpoint_id == *checkpoint_id {
646                            cp.state = AttemptCheckpointState::Active;
647                        } else if cp.state != AttemptCheckpointState::Completed {
648                            cp.state = AttemptCheckpointState::Declared;
649                        }
650                    }
651                }
652            }
653
654            EventPayload::CheckpointCompleted {
655                attempt_id,
656                checkpoint_id,
657                commit_hash,
658                timestamp,
659                summary,
660                ..
661            } => {
662                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
663                    if let Some(cp) = attempt
664                        .checkpoints
665                        .iter_mut()
666                        .find(|cp| cp.checkpoint_id == *checkpoint_id)
667                    {
668                        cp.state = AttemptCheckpointState::Completed;
669                        cp.commit_hash = Some(commit_hash.clone());
670                        cp.completed_at = Some(*timestamp);
671                        cp.summary.clone_from(summary);
672                    }
673                }
674            }
675
676            EventPayload::AllCheckpointsCompleted { attempt_id, .. } => {
677                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
678                    attempt.all_checkpoints_completed = true;
679                }
680            }
681
682            EventPayload::BaselineCaptured {
683                attempt_id,
684                baseline_id,
685                ..
686            } => {
687                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
688                    attempt.baseline_id = Some(*baseline_id);
689                }
690            }
691
692            EventPayload::DiffComputed {
693                attempt_id,
694                diff_id,
695                ..
696            } => {
697                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
698                    attempt.diff_id = Some(*diff_id);
699                }
700            }
701
702            EventPayload::CheckCompleted {
703                attempt_id,
704                check_name,
705                passed,
706                exit_code,
707                output,
708                duration_ms,
709                required,
710                ..
711            } => {
712                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
713                    attempt.check_results.push(CheckResult {
714                        name: check_name.clone(),
715                        passed: *passed,
716                        exit_code: *exit_code,
717                        output: output.clone(),
718                        duration_ms: *duration_ms,
719                        required: *required,
720                    });
721                }
722            }
723
724            EventPayload::CheckStarted { .. }
725            | EventPayload::ErrorOccurred { .. }
726            | EventPayload::TaskExecutionStarted { .. }
727            | EventPayload::TaskExecutionSucceeded { .. }
728            | EventPayload::TaskExecutionFailed { .. }
729            | EventPayload::MergeConflictDetected { .. }
730            | EventPayload::MergeCheckStarted { .. }
731            | EventPayload::MergeCheckCompleted { .. }
732            | EventPayload::RuntimeStarted { .. }
733            | EventPayload::RuntimeOutputChunk { .. }
734            | EventPayload::RuntimeInputProvided { .. }
735            | EventPayload::RuntimeInterrupted { .. }
736            | EventPayload::RuntimeExited { .. }
737            | EventPayload::RuntimeTerminated { .. }
738            | EventPayload::RuntimeFilesystemObserved { .. }
739            | EventPayload::RuntimeCommandObserved { .. }
740            | EventPayload::RuntimeToolCallObserved { .. }
741            | EventPayload::RuntimeTodoSnapshotUpdated { .. }
742            | EventPayload::RuntimeNarrativeOutputObserved { .. }
743            | EventPayload::FileModified { .. }
744            | EventPayload::CheckpointCommitCreated { .. }
745            | EventPayload::ScopeValidated { .. }
746            | EventPayload::ScopeViolationDetected { .. }
747            | EventPayload::ScopeConflictDetected { .. }
748            | EventPayload::TaskSchedulingDeferred { .. }
749            | EventPayload::RetryContextAssembled { .. }
750            | EventPayload::FlowIntegrationLockAcquired { .. }
751            | EventPayload::Unknown => {}
752
753            EventPayload::TaskRetryRequested {
754                task_id,
755                reset_count,
756                retry_mode,
757            } => {
758                let flow_id = event.metadata.correlation.flow_id;
759                let mut candidate_flow_ids = Vec::new();
760
761                if let Some(fid) = flow_id {
762                    candidate_flow_ids.push(fid);
763                } else {
764                    for (fid, flow) in &self.flows {
765                        if flow.task_executions.contains_key(task_id) {
766                            candidate_flow_ids.push(*fid);
767                        }
768                    }
769                }
770
771                for fid in candidate_flow_ids {
772                    if let Some(flow) = self.flows.get_mut(&fid) {
773                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
774                            exec.state = TaskExecState::Pending;
775                            exec.blocked_reason = None;
776                            exec.updated_at = timestamp;
777                            exec.retry_mode = *retry_mode;
778                            if *reset_count {
779                                exec.attempt_count = 0;
780                            }
781                            flow.updated_at = timestamp;
782                            break;
783                        }
784                    }
785                }
786            }
787
788            EventPayload::TaskAborted { task_id, reason: _ } => {
789                let flow_id = event.metadata.correlation.flow_id;
790                let mut candidate_flow_ids = Vec::new();
791
792                if let Some(fid) = flow_id {
793                    candidate_flow_ids.push(fid);
794                } else {
795                    for (fid, flow) in &self.flows {
796                        if flow.task_executions.contains_key(task_id) {
797                            candidate_flow_ids.push(*fid);
798                        }
799                    }
800                }
801
802                for fid in candidate_flow_ids {
803                    if let Some(flow) = self.flows.get_mut(&fid) {
804                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
805                            exec.state = TaskExecState::Failed;
806                            exec.blocked_reason = None;
807                            exec.updated_at = timestamp;
808                            flow.updated_at = timestamp;
809                            break;
810                        }
811                    }
812                }
813            }
814
815            EventPayload::HumanOverride {
816                task_id,
817                override_type: _,
818                decision,
819                reason: _,
820                user: _,
821            } => {
822                let flow_id = event.metadata.correlation.flow_id;
823                let mut candidate_flow_ids = Vec::new();
824
825                if let Some(fid) = flow_id {
826                    candidate_flow_ids.push(fid);
827                } else {
828                    for (fid, flow) in &self.flows {
829                        if flow.task_executions.contains_key(task_id) {
830                            candidate_flow_ids.push(*fid);
831                        }
832                    }
833                }
834
835                let new_state = if decision == "pass" {
836                    TaskExecState::Success
837                } else {
838                    TaskExecState::Failed
839                };
840
841                for fid in candidate_flow_ids {
842                    if let Some(flow) = self.flows.get_mut(&fid) {
843                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
844                            exec.state = new_state;
845                            exec.blocked_reason = None;
846                            exec.updated_at = timestamp;
847                            flow.updated_at = timestamp;
848                            break;
849                        }
850                    }
851                }
852            }
853
854            EventPayload::MergePrepared {
855                flow_id,
856                target_branch,
857                conflicts,
858            } => {
859                self.merge_states.insert(
860                    *flow_id,
861                    MergeState {
862                        flow_id: *flow_id,
863                        status: MergeStatus::Prepared,
864                        target_branch: target_branch.clone(),
865                        conflicts: conflicts.clone(),
866                        commits: Vec::new(),
867                        updated_at: timestamp,
868                    },
869                );
870            }
871            EventPayload::MergeApproved { flow_id, user: _ } => {
872                if let Some(ms) = self.merge_states.get_mut(flow_id) {
873                    ms.status = MergeStatus::Approved;
874                    ms.updated_at = timestamp;
875                }
876            }
877            EventPayload::MergeCompleted { flow_id, commits } => {
878                if let Some(ms) = self.merge_states.get_mut(flow_id) {
879                    ms.status = MergeStatus::Completed;
880                    commits.clone_into(&mut ms.commits);
881                    ms.updated_at = timestamp;
882                }
883
884                if let Some(flow) = self.flows.get_mut(flow_id) {
885                    flow.state = FlowState::Merged;
886                    flow.completed_at = Some(timestamp);
887                    flow.updated_at = timestamp;
888                }
889            }
890        }
891    }
892
893    /// Replays a sequence of events to produce state.
894    /// Deterministic: same events → same state.
895    #[must_use]
896    pub fn replay(events: &[Event]) -> Self {
897        let mut state = Self::new();
898        for event in events {
899            state.apply_mut(event);
900        }
901        state
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908    use crate::core::events::CorrelationIds;
909
910    #[test]
911    fn replay_is_deterministic() {
912        let project_id = Uuid::new_v4();
913        let events = vec![
914            Event::new(
915                EventPayload::ProjectCreated {
916                    id: project_id,
917                    name: "test".to_string(),
918                    description: None,
919                },
920                CorrelationIds::for_project(project_id),
921            ),
922            Event::new(
923                EventPayload::ProjectUpdated {
924                    id: project_id,
925                    name: Some("updated".to_string()),
926                    description: None,
927                },
928                CorrelationIds::for_project(project_id),
929            ),
930        ];
931
932        let state1 = AppState::replay(&events);
933        let state2 = AppState::replay(&events);
934
935        assert_eq!(state1.projects.len(), state2.projects.len());
936        assert_eq!(
937            state1.projects.get(&project_id).unwrap().name,
938            state2.projects.get(&project_id).unwrap().name
939        );
940    }
941
942    #[test]
943    fn replay_is_idempotent() {
944        let project_id = Uuid::new_v4();
945        let events = vec![Event::new(
946            EventPayload::ProjectCreated {
947                id: project_id,
948                name: "test".to_string(),
949                description: None,
950            },
951            CorrelationIds::for_project(project_id),
952        )];
953
954        let state1 = AppState::replay(&events);
955        let state2 = AppState::replay(&events);
956
957        assert_eq!(state1.projects.len(), 1);
958        assert_eq!(state2.projects.len(), 1);
959    }
960
961    #[test]
962    fn task_lifecycle() {
963        let project_id = Uuid::new_v4();
964        let task_id = Uuid::new_v4();
965
966        let events = vec![
967            Event::new(
968                EventPayload::ProjectCreated {
969                    id: project_id,
970                    name: "proj".to_string(),
971                    description: None,
972                },
973                CorrelationIds::for_project(project_id),
974            ),
975            Event::new(
976                EventPayload::TaskCreated {
977                    id: task_id,
978                    project_id,
979                    title: "task1".to_string(),
980                    description: None,
981                    scope: None,
982                },
983                CorrelationIds::for_task(project_id, task_id),
984            ),
985            Event::new(
986                EventPayload::TaskClosed {
987                    id: task_id,
988                    reason: None,
989                },
990                CorrelationIds::for_task(project_id, task_id),
991            ),
992        ];
993
994        let state = AppState::replay(&events);
995        let task = state.tasks.get(&task_id).unwrap();
996
997        assert_eq!(task.state, TaskState::Closed);
998    }
999}