Skip to main content

hivemind/core/
state.rs

1//! State derived from events.
2//!
3//! All state in Hivemind is derived by replaying events. This ensures
4//! determinism, idempotency, and complete observability.
5use super::events::{Event, EventPayload, RuntimeRole};
6use super::flow::{FlowState, RetryMode, RunMode, TaskExecState, TaskExecution, TaskFlow};
7use super::graph::{GraphState, TaskGraph};
8use super::scope::{RepoAccessMode, Scope};
9use super::verification::CheckResult;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use uuid::Uuid;
14
15const fn default_max_parallel_tasks() -> u16 {
16    1
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum AttemptCheckpointState {
22    Declared,
23    Active,
24    Completed,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct AttemptCheckpoint {
29    pub checkpoint_id: String,
30    pub order: u32,
31    pub total: u32,
32    pub state: AttemptCheckpointState,
33    #[serde(default)]
34    pub commit_hash: Option<String>,
35    #[serde(default)]
36    pub completed_at: Option<DateTime<Utc>>,
37    #[serde(default)]
38    pub summary: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct AttemptState {
43    pub id: Uuid,
44    pub flow_id: Uuid,
45    pub task_id: Uuid,
46    pub attempt_number: u32,
47    pub started_at: DateTime<Utc>,
48    pub baseline_id: Option<Uuid>,
49    pub diff_id: Option<Uuid>,
50    #[serde(default)]
51    pub check_results: Vec<CheckResult>,
52    #[serde(default)]
53    pub checkpoints: Vec<AttemptCheckpoint>,
54    #[serde(default)]
55    pub all_checkpoints_completed: bool,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct ProjectRuntimeConfig {
60    pub adapter_name: String,
61    pub binary_path: String,
62    #[serde(default)]
63    pub model: Option<String>,
64    #[serde(default)]
65    pub args: Vec<String>,
66    #[serde(default)]
67    pub env: HashMap<String, String>,
68    pub timeout_ms: u64,
69    #[serde(default = "default_max_parallel_tasks")]
70    pub max_parallel_tasks: u16,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct TaskRuntimeConfig {
75    pub adapter_name: String,
76    pub binary_path: String,
77    #[serde(default)]
78    pub model: Option<String>,
79    #[serde(default)]
80    pub args: Vec<String>,
81    #[serde(default)]
82    pub env: HashMap<String, String>,
83    pub timeout_ms: u64,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
87pub struct RuntimeRoleDefaults {
88    #[serde(default)]
89    pub worker: Option<ProjectRuntimeConfig>,
90    #[serde(default)]
91    pub validator: Option<ProjectRuntimeConfig>,
92}
93
94impl RuntimeRoleDefaults {
95    pub fn set(&mut self, role: RuntimeRole, config: Option<ProjectRuntimeConfig>) {
96        match role {
97            RuntimeRole::Worker => self.worker = config,
98            RuntimeRole::Validator => self.validator = config,
99        }
100    }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub struct GovernanceProjectStorage {
105    pub project_id: Uuid,
106    pub schema_version: String,
107    pub projection_version: u32,
108    pub root_path: String,
109    pub initialized_at: DateTime<Utc>,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct GovernanceArtifact {
114    #[serde(default)]
115    pub project_id: Option<Uuid>,
116    pub scope: String,
117    pub artifact_kind: String,
118    pub artifact_key: String,
119    pub path: String,
120    #[serde(default)]
121    pub revision: u64,
122    pub schema_version: String,
123    pub projection_version: u32,
124    pub updated_at: DateTime<Utc>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128pub struct GovernanceAttachment {
129    pub project_id: Uuid,
130    pub task_id: Uuid,
131    pub artifact_kind: String,
132    pub artifact_key: String,
133    pub attached: bool,
134    pub schema_version: String,
135    pub projection_version: u32,
136    pub updated_at: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct GovernanceMigration {
141    #[serde(default)]
142    pub project_id: Option<Uuid>,
143    pub from_layout: String,
144    pub to_layout: String,
145    #[serde(default)]
146    pub migrated_paths: Vec<String>,
147    pub rollback_hint: String,
148    pub schema_version: String,
149    pub projection_version: u32,
150    pub migrated_at: DateTime<Utc>,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
154pub struct TaskRuntimeRoleOverrides {
155    #[serde(default)]
156    pub worker: Option<TaskRuntimeConfig>,
157    #[serde(default)]
158    pub validator: Option<TaskRuntimeConfig>,
159}
160
161impl TaskRuntimeRoleOverrides {
162    pub fn set(&mut self, role: RuntimeRole, config: Option<TaskRuntimeConfig>) {
163        match role {
164            RuntimeRole::Worker => self.worker = config,
165            RuntimeRole::Validator => self.validator = config,
166        }
167    }
168}
169
170/// A project in the system.
171#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172pub struct Project {
173    pub id: Uuid,
174    pub name: String,
175    pub description: Option<String>,
176    pub created_at: DateTime<Utc>,
177    pub updated_at: DateTime<Utc>,
178    pub repositories: Vec<Repository>,
179    #[serde(default)]
180    pub runtime: Option<ProjectRuntimeConfig>,
181    #[serde(default)]
182    pub runtime_defaults: RuntimeRoleDefaults,
183    #[serde(default)]
184    pub constitution_digest: Option<String>,
185    #[serde(default)]
186    pub constitution_schema_version: Option<String>,
187    #[serde(default)]
188    pub constitution_version: Option<u32>,
189    #[serde(default)]
190    pub constitution_updated_at: Option<DateTime<Utc>>,
191}
192
193/// A repository attached to a project.
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195pub struct Repository {
196    pub name: String,
197    pub path: String,
198    #[serde(default)]
199    pub access_mode: RepoAccessMode,
200}
201
202/// Task state.
203#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "lowercase")]
205pub enum TaskState {
206    Open,
207    Closed,
208}
209
210/// A task in the system.
211#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
212pub struct Task {
213    pub id: Uuid,
214    pub project_id: Uuid,
215    pub title: String,
216    pub description: Option<String>,
217    #[serde(default)]
218    pub scope: Option<Scope>,
219    #[serde(default)]
220    pub runtime_override: Option<TaskRuntimeConfig>,
221    #[serde(default)]
222    pub runtime_overrides: TaskRuntimeRoleOverrides,
223    #[serde(default)]
224    pub run_mode: RunMode,
225    pub state: TaskState,
226    pub created_at: DateTime<Utc>,
227    pub updated_at: DateTime<Utc>,
228}
229
230/// Merge workflow status.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(rename_all = "lowercase")]
233pub enum MergeStatus {
234    Prepared,
235    Approved,
236    Completed,
237}
238
239/// Merge state for a flow.
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct MergeState {
242    pub flow_id: Uuid,
243    pub status: MergeStatus,
244    pub target_branch: Option<String>,
245    pub conflicts: Vec<String>,
246    pub commits: Vec<String>,
247    pub updated_at: DateTime<Utc>,
248}
249
250/// The complete application state derived from events.
251#[derive(Debug, Default, Clone)]
252pub struct AppState {
253    pub projects: HashMap<Uuid, Project>,
254    pub governance_projects: HashMap<Uuid, GovernanceProjectStorage>,
255    pub governance_artifacts: HashMap<String, GovernanceArtifact>,
256    pub governance_attachments: HashMap<String, GovernanceAttachment>,
257    pub governance_migrations: Vec<GovernanceMigration>,
258    pub tasks: HashMap<Uuid, Task>,
259    pub graphs: HashMap<Uuid, TaskGraph>,
260    pub flows: HashMap<Uuid, TaskFlow>,
261    pub global_runtime_defaults: RuntimeRoleDefaults,
262    pub flow_runtime_defaults: HashMap<Uuid, RuntimeRoleDefaults>,
263    pub merge_states: HashMap<Uuid, MergeState>,
264    pub attempts: HashMap<Uuid, AttemptState>,
265}
266
267impl AppState {
268    /// Creates a new empty state.
269    #[must_use]
270    pub fn new() -> Self {
271        Self::default()
272    }
273
274    /// Applies an event to the state, returning a new state.
275    #[must_use]
276    pub fn apply(mut self, event: &Event) -> Self {
277        self.apply_mut(event);
278        self
279    }
280
281    /// Applies an event to the state in place.
282    #[allow(clippy::too_many_lines)]
283    pub fn apply_mut(&mut self, event: &Event) {
284        let timestamp = event.timestamp();
285
286        match &event.payload {
287            EventPayload::ProjectCreated {
288                id,
289                name,
290                description,
291            } => {
292                self.projects.insert(
293                    *id,
294                    Project {
295                        id: *id,
296                        name: name.clone(),
297                        description: description.clone(),
298                        created_at: timestamp,
299                        updated_at: timestamp,
300                        repositories: Vec::new(),
301                        runtime: None,
302                        runtime_defaults: RuntimeRoleDefaults::default(),
303                        constitution_digest: None,
304                        constitution_schema_version: None,
305                        constitution_version: None,
306                        constitution_updated_at: None,
307                    },
308                );
309            }
310            EventPayload::ProjectUpdated {
311                id,
312                name,
313                description,
314            } => {
315                if let Some(project) = self.projects.get_mut(id) {
316                    if let Some(n) = name {
317                        n.clone_into(&mut project.name);
318                    }
319                    if let Some(d) = description {
320                        project.description = Some(d.clone());
321                    }
322                    project.updated_at = timestamp;
323                }
324            }
325
326            EventPayload::TaskExecutionFrozen {
327                flow_id,
328                task_id,
329                commit_sha,
330            } => {
331                if let Some(flow) = self.flows.get_mut(flow_id) {
332                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
333                        exec.frozen_commit_sha.clone_from(commit_sha);
334                        exec.updated_at = timestamp;
335                        flow.updated_at = timestamp;
336                    }
337                }
338            }
339
340            EventPayload::TaskIntegratedIntoFlow {
341                flow_id,
342                task_id,
343                commit_sha,
344            } => {
345                if let Some(flow) = self.flows.get_mut(flow_id) {
346                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
347                        exec.integrated_commit_sha.clone_from(commit_sha);
348                        exec.updated_at = timestamp;
349                        flow.updated_at = timestamp;
350                    }
351                }
352            }
353            EventPayload::ProjectRuntimeConfigured {
354                project_id,
355                adapter_name,
356                binary_path,
357                model,
358                args,
359                env,
360                timeout_ms,
361                max_parallel_tasks,
362            } => {
363                if let Some(project) = self.projects.get_mut(project_id) {
364                    let configured = ProjectRuntimeConfig {
365                        adapter_name: adapter_name.clone(),
366                        binary_path: binary_path.clone(),
367                        model: model.clone(),
368                        args: args.clone(),
369                        env: env.clone(),
370                        timeout_ms: *timeout_ms,
371                        max_parallel_tasks: *max_parallel_tasks,
372                    };
373                    project.runtime = Some(configured.clone());
374                    project.runtime_defaults.worker = Some(configured);
375                    project.updated_at = timestamp;
376                }
377            }
378            EventPayload::ProjectRuntimeRoleConfigured {
379                project_id,
380                role,
381                adapter_name,
382                binary_path,
383                model,
384                args,
385                env,
386                timeout_ms,
387                max_parallel_tasks,
388            } => {
389                if let Some(project) = self.projects.get_mut(project_id) {
390                    let configured = ProjectRuntimeConfig {
391                        adapter_name: adapter_name.clone(),
392                        binary_path: binary_path.clone(),
393                        model: model.clone(),
394                        args: args.clone(),
395                        env: env.clone(),
396                        timeout_ms: *timeout_ms,
397                        max_parallel_tasks: *max_parallel_tasks,
398                    };
399                    project
400                        .runtime_defaults
401                        .set(*role, Some(configured.clone()));
402                    if *role == RuntimeRole::Worker {
403                        project.runtime = Some(configured);
404                    }
405                    project.updated_at = timestamp;
406                }
407            }
408            EventPayload::GlobalRuntimeConfigured {
409                role,
410                adapter_name,
411                binary_path,
412                model,
413                args,
414                env,
415                timeout_ms,
416                max_parallel_tasks,
417            } => {
418                let configured = ProjectRuntimeConfig {
419                    adapter_name: adapter_name.clone(),
420                    binary_path: binary_path.clone(),
421                    model: model.clone(),
422                    args: args.clone(),
423                    env: env.clone(),
424                    timeout_ms: *timeout_ms,
425                    max_parallel_tasks: *max_parallel_tasks,
426                };
427                self.global_runtime_defaults.set(*role, Some(configured));
428            }
429            EventPayload::TaskCreated {
430                id,
431                project_id,
432                title,
433                description,
434                scope,
435            } => {
436                self.tasks.insert(
437                    *id,
438                    Task {
439                        id: *id,
440                        project_id: *project_id,
441                        title: title.clone(),
442                        description: description.clone(),
443                        scope: scope.clone(),
444                        runtime_override: None,
445                        runtime_overrides: TaskRuntimeRoleOverrides::default(),
446                        run_mode: RunMode::Auto,
447                        state: TaskState::Open,
448                        created_at: timestamp,
449                        updated_at: timestamp,
450                    },
451                );
452            }
453            EventPayload::TaskUpdated {
454                id,
455                title,
456                description,
457            } => {
458                if let Some(task) = self.tasks.get_mut(id) {
459                    if let Some(t) = title {
460                        t.clone_into(&mut task.title);
461                    }
462                    if let Some(d) = description {
463                        task.description = Some(d.clone());
464                    }
465                    task.updated_at = timestamp;
466                }
467            }
468            EventPayload::TaskRuntimeConfigured {
469                task_id,
470                adapter_name,
471                binary_path,
472                model,
473                args,
474                env,
475                timeout_ms,
476            } => {
477                if let Some(task) = self.tasks.get_mut(task_id) {
478                    let configured = TaskRuntimeConfig {
479                        adapter_name: adapter_name.clone(),
480                        binary_path: binary_path.clone(),
481                        model: model.clone(),
482                        args: args.clone(),
483                        env: env.clone(),
484                        timeout_ms: *timeout_ms,
485                    };
486                    task.runtime_override = Some(configured.clone());
487                    task.runtime_overrides.worker = Some(configured);
488                    task.updated_at = timestamp;
489                }
490            }
491            EventPayload::TaskRuntimeRoleConfigured {
492                task_id,
493                role,
494                adapter_name,
495                binary_path,
496                model,
497                args,
498                env,
499                timeout_ms,
500            } => {
501                if let Some(task) = self.tasks.get_mut(task_id) {
502                    let configured = TaskRuntimeConfig {
503                        adapter_name: adapter_name.clone(),
504                        binary_path: binary_path.clone(),
505                        model: model.clone(),
506                        args: args.clone(),
507                        env: env.clone(),
508                        timeout_ms: *timeout_ms,
509                    };
510                    task.runtime_overrides.set(*role, Some(configured.clone()));
511                    if *role == RuntimeRole::Worker {
512                        task.runtime_override = Some(configured);
513                    }
514                    task.updated_at = timestamp;
515                }
516            }
517            EventPayload::TaskRuntimeCleared { task_id } => {
518                if let Some(task) = self.tasks.get_mut(task_id) {
519                    task.runtime_override = None;
520                    task.runtime_overrides.worker = None;
521                    task.updated_at = timestamp;
522                }
523            }
524            EventPayload::TaskRuntimeRoleCleared { task_id, role } => {
525                if let Some(task) = self.tasks.get_mut(task_id) {
526                    task.runtime_overrides.set(*role, None);
527                    if *role == RuntimeRole::Worker {
528                        task.runtime_override = None;
529                    }
530                    task.updated_at = timestamp;
531                }
532            }
533            EventPayload::TaskRunModeSet { task_id, mode } => {
534                if let Some(task) = self.tasks.get_mut(task_id) {
535                    task.run_mode = *mode;
536                    task.updated_at = timestamp;
537                }
538            }
539            EventPayload::TaskClosed { id, reason: _ } => {
540                if let Some(task) = self.tasks.get_mut(id) {
541                    task.state = TaskState::Closed;
542                    task.updated_at = timestamp;
543                }
544            }
545            EventPayload::RepositoryAttached {
546                project_id,
547                path,
548                name,
549                access_mode,
550            } => {
551                if let Some(project) = self.projects.get_mut(project_id) {
552                    project.repositories.push(Repository {
553                        name: name.clone(),
554                        path: path.clone(),
555                        access_mode: *access_mode,
556                    });
557                    project.updated_at = timestamp;
558                }
559            }
560            EventPayload::RepositoryDetached { project_id, name } => {
561                if let Some(project) = self.projects.get_mut(project_id) {
562                    project.repositories.retain(|r| r.name != *name);
563                    project.updated_at = timestamp;
564                }
565            }
566
567            EventPayload::GovernanceProjectStorageInitialized {
568                project_id,
569                schema_version,
570                projection_version,
571                root_path,
572            } => {
573                self.governance_projects.insert(
574                    *project_id,
575                    GovernanceProjectStorage {
576                        project_id: *project_id,
577                        schema_version: schema_version.clone(),
578                        projection_version: *projection_version,
579                        root_path: root_path.clone(),
580                        initialized_at: timestamp,
581                    },
582                );
583            }
584            EventPayload::GovernanceArtifactUpserted {
585                project_id,
586                scope,
587                artifact_kind,
588                artifact_key,
589                path,
590                revision,
591                schema_version,
592                projection_version,
593            } => {
594                let project_key =
595                    project_id.map_or_else(|| "global".to_string(), |id| id.to_string());
596                let key = format!("{project_key}::{scope}::{artifact_kind}::{artifact_key}");
597                self.governance_artifacts.insert(
598                    key,
599                    GovernanceArtifact {
600                        project_id: *project_id,
601                        scope: scope.clone(),
602                        artifact_kind: artifact_kind.clone(),
603                        artifact_key: artifact_key.clone(),
604                        path: path.clone(),
605                        revision: *revision,
606                        schema_version: schema_version.clone(),
607                        projection_version: *projection_version,
608                        updated_at: timestamp,
609                    },
610                );
611            }
612            EventPayload::GovernanceArtifactDeleted {
613                project_id,
614                scope,
615                artifact_kind,
616                artifact_key,
617                path: _,
618                schema_version: _,
619                projection_version: _,
620            } => {
621                let project_key =
622                    project_id.map_or_else(|| "global".to_string(), |id| id.to_string());
623                let key = format!("{project_key}::{scope}::{artifact_kind}::{artifact_key}");
624                self.governance_artifacts.remove(&key);
625            }
626            EventPayload::GovernanceAttachmentLifecycleUpdated {
627                project_id,
628                task_id,
629                artifact_kind,
630                artifact_key,
631                attached,
632                schema_version,
633                projection_version,
634            } => {
635                let key = format!("{project_id}::{task_id}::{artifact_kind}::{artifact_key}");
636                self.governance_attachments.insert(
637                    key,
638                    GovernanceAttachment {
639                        project_id: *project_id,
640                        task_id: *task_id,
641                        artifact_kind: artifact_kind.clone(),
642                        artifact_key: artifact_key.clone(),
643                        attached: *attached,
644                        schema_version: schema_version.clone(),
645                        projection_version: *projection_version,
646                        updated_at: timestamp,
647                    },
648                );
649            }
650            EventPayload::GovernanceStorageMigrated {
651                project_id,
652                from_layout,
653                to_layout,
654                migrated_paths,
655                rollback_hint,
656                schema_version,
657                projection_version,
658            } => {
659                self.governance_migrations.push(GovernanceMigration {
660                    project_id: *project_id,
661                    from_layout: from_layout.clone(),
662                    to_layout: to_layout.clone(),
663                    migrated_paths: migrated_paths.clone(),
664                    rollback_hint: rollback_hint.clone(),
665                    schema_version: schema_version.clone(),
666                    projection_version: *projection_version,
667                    migrated_at: timestamp,
668                });
669            }
670            EventPayload::ConstitutionInitialized {
671                project_id,
672                schema_version,
673                constitution_version,
674                digest,
675                ..
676            }
677            | EventPayload::ConstitutionUpdated {
678                project_id,
679                schema_version,
680                constitution_version,
681                digest,
682                ..
683            } => {
684                if let Some(project) = self.projects.get_mut(project_id) {
685                    project.constitution_digest = Some(digest.clone());
686                    project.constitution_schema_version = Some(schema_version.clone());
687                    project.constitution_version = Some(*constitution_version);
688                    project.constitution_updated_at = Some(timestamp);
689                    project.updated_at = timestamp;
690                }
691            }
692            EventPayload::TaskGraphCreated {
693                graph_id,
694                project_id,
695                name,
696                description,
697            } => {
698                self.graphs.insert(
699                    *graph_id,
700                    TaskGraph {
701                        id: *graph_id,
702                        project_id: *project_id,
703                        name: name.clone(),
704                        description: description.clone(),
705                        state: GraphState::Draft,
706                        tasks: HashMap::new(),
707                        dependencies: HashMap::<Uuid, HashSet<Uuid>>::new(),
708                        created_at: timestamp,
709                        updated_at: timestamp,
710                    },
711                );
712            }
713            EventPayload::TaskAddedToGraph { graph_id, task } => {
714                if let Some(graph) = self.graphs.get_mut(graph_id) {
715                    graph.tasks.insert(task.id, task.clone());
716                    graph.dependencies.entry(task.id).or_default();
717                    graph.updated_at = timestamp;
718                }
719            }
720            EventPayload::DependencyAdded {
721                graph_id,
722                from_task,
723                to_task,
724            } => {
725                if let Some(graph) = self.graphs.get_mut(graph_id) {
726                    graph
727                        .dependencies
728                        .entry(*to_task)
729                        .or_default()
730                        .insert(*from_task);
731                    graph.updated_at = timestamp;
732                }
733            }
734            EventPayload::GraphTaskCheckAdded {
735                graph_id,
736                task_id,
737                check,
738            } => {
739                if let Some(graph) = self.graphs.get_mut(graph_id) {
740                    if let Some(task) = graph.tasks.get_mut(task_id) {
741                        task.criteria.checks.push(check.clone());
742                        graph.updated_at = timestamp;
743                    }
744                }
745            }
746            EventPayload::ScopeAssigned {
747                graph_id,
748                task_id,
749                scope,
750            } => {
751                if let Some(graph) = self.graphs.get_mut(graph_id) {
752                    if let Some(task) = graph.tasks.get_mut(task_id) {
753                        task.scope = Some(scope.clone());
754                        graph.updated_at = timestamp;
755                    }
756                }
757            }
758
759            EventPayload::TaskGraphValidated {
760                graph_id,
761                project_id: _,
762                valid,
763                issues: _,
764            } => {
765                if *valid {
766                    if let Some(graph) = self.graphs.get_mut(graph_id) {
767                        graph.state = GraphState::Validated;
768                        graph.updated_at = timestamp;
769                    }
770                }
771            }
772
773            EventPayload::TaskGraphLocked {
774                graph_id,
775                project_id: _,
776            } => {
777                if let Some(graph) = self.graphs.get_mut(graph_id) {
778                    graph.state = GraphState::Locked;
779                    graph.updated_at = timestamp;
780                }
781            }
782            EventPayload::TaskFlowCreated {
783                flow_id,
784                graph_id,
785                project_id,
786                name: _,
787                task_ids,
788            } => {
789                if let Some(graph) = self.graphs.get_mut(graph_id) {
790                    graph.state = GraphState::Locked;
791                    graph.updated_at = timestamp;
792                }
793
794                let mut task_executions = HashMap::new();
795                for task_id in task_ids {
796                    task_executions.insert(
797                        *task_id,
798                        TaskExecution {
799                            task_id: *task_id,
800                            state: TaskExecState::Pending,
801                            attempt_count: 0,
802                            retry_mode: RetryMode::default(),
803                            frozen_commit_sha: None,
804                            integrated_commit_sha: None,
805                            updated_at: timestamp,
806                            blocked_reason: None,
807                        },
808                    );
809                }
810
811                self.flows.insert(
812                    *flow_id,
813                    TaskFlow {
814                        id: *flow_id,
815                        graph_id: *graph_id,
816                        project_id: *project_id,
817                        base_revision: None,
818                        run_mode: RunMode::Manual,
819                        depends_on_flows: HashSet::new(),
820                        state: FlowState::Created,
821                        task_executions,
822                        created_at: timestamp,
823                        started_at: None,
824                        completed_at: None,
825                        updated_at: timestamp,
826                    },
827                );
828                self.flow_runtime_defaults.entry(*flow_id).or_default();
829            }
830            EventPayload::TaskFlowDependencyAdded {
831                flow_id,
832                depends_on_flow_id,
833            } => {
834                if let Some(flow) = self.flows.get_mut(flow_id) {
835                    flow.depends_on_flows.insert(*depends_on_flow_id);
836                    flow.updated_at = timestamp;
837                }
838            }
839            EventPayload::TaskFlowRunModeSet { flow_id, mode } => {
840                if let Some(flow) = self.flows.get_mut(flow_id) {
841                    flow.run_mode = *mode;
842                    flow.updated_at = timestamp;
843                }
844            }
845            EventPayload::TaskFlowRuntimeConfigured {
846                flow_id,
847                role,
848                adapter_name,
849                binary_path,
850                model,
851                args,
852                env,
853                timeout_ms,
854                max_parallel_tasks,
855            } => {
856                let configured = ProjectRuntimeConfig {
857                    adapter_name: adapter_name.clone(),
858                    binary_path: binary_path.clone(),
859                    model: model.clone(),
860                    args: args.clone(),
861                    env: env.clone(),
862                    timeout_ms: *timeout_ms,
863                    max_parallel_tasks: *max_parallel_tasks,
864                };
865                self.flow_runtime_defaults
866                    .entry(*flow_id)
867                    .or_default()
868                    .set(*role, Some(configured));
869                if let Some(flow) = self.flows.get_mut(flow_id) {
870                    flow.updated_at = timestamp;
871                }
872            }
873            EventPayload::TaskFlowRuntimeCleared { flow_id, role } => {
874                self.flow_runtime_defaults
875                    .entry(*flow_id)
876                    .or_default()
877                    .set(*role, None);
878                if let Some(flow) = self.flows.get_mut(flow_id) {
879                    flow.updated_at = timestamp;
880                }
881            }
882            EventPayload::TaskFlowStarted {
883                flow_id,
884                base_revision,
885            } => {
886                if let Some(flow) = self.flows.get_mut(flow_id) {
887                    flow.state = FlowState::Running;
888                    flow.started_at = Some(timestamp);
889                    flow.base_revision.clone_from(base_revision);
890                    flow.updated_at = timestamp;
891                }
892            }
893            EventPayload::TaskFlowPaused {
894                flow_id,
895                running_tasks: _,
896            } => {
897                if let Some(flow) = self.flows.get_mut(flow_id) {
898                    flow.state = FlowState::Paused;
899                    flow.updated_at = timestamp;
900                }
901            }
902            EventPayload::TaskFlowResumed { flow_id } => {
903                if let Some(flow) = self.flows.get_mut(flow_id) {
904                    flow.state = FlowState::Running;
905                    flow.updated_at = timestamp;
906                }
907            }
908            EventPayload::TaskFlowCompleted { flow_id } => {
909                if let Some(flow) = self.flows.get_mut(flow_id) {
910                    flow.state = FlowState::Completed;
911                    flow.completed_at = Some(timestamp);
912                    flow.updated_at = timestamp;
913                }
914            }
915
916            EventPayload::FlowFrozenForMerge { flow_id } => {
917                if let Some(flow) = self.flows.get_mut(flow_id) {
918                    flow.state = FlowState::FrozenForMerge;
919                    flow.updated_at = timestamp;
920                }
921            }
922            EventPayload::TaskFlowAborted {
923                flow_id,
924                reason: _,
925                forced: _,
926            } => {
927                if let Some(flow) = self.flows.get_mut(flow_id) {
928                    flow.state = FlowState::Aborted;
929                    flow.completed_at = Some(timestamp);
930                    flow.updated_at = timestamp;
931                }
932            }
933            EventPayload::TaskReady { flow_id, task_id } => {
934                if let Some(flow) = self.flows.get_mut(flow_id) {
935                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
936                        exec.state = TaskExecState::Ready;
937                        exec.blocked_reason = None;
938                        exec.updated_at = timestamp;
939                    }
940                    flow.updated_at = timestamp;
941                }
942            }
943            EventPayload::TaskBlocked {
944                flow_id,
945                task_id,
946                reason,
947            } => {
948                if let Some(flow) = self.flows.get_mut(flow_id) {
949                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
950                        exec.state = TaskExecState::Pending;
951                        exec.blocked_reason.clone_from(reason);
952                        exec.updated_at = timestamp;
953                    }
954                    flow.updated_at = timestamp;
955                }
956            }
957            EventPayload::TaskExecutionStateChanged {
958                flow_id,
959                task_id,
960                attempt_id: _,
961                from: _,
962                to,
963            } => {
964                if let Some(flow) = self.flows.get_mut(flow_id) {
965                    if let Some(exec) = flow.task_executions.get_mut(task_id) {
966                        exec.state = *to;
967                        exec.updated_at = timestamp;
968                        exec.blocked_reason = None;
969                        if *to == TaskExecState::Running {
970                            exec.attempt_count += 1;
971                        }
972                    }
973                    flow.updated_at = timestamp;
974                }
975            }
976
977            EventPayload::AttemptStarted {
978                flow_id,
979                task_id,
980                attempt_id,
981                attempt_number,
982            } => {
983                self.attempts.insert(
984                    *attempt_id,
985                    AttemptState {
986                        id: *attempt_id,
987                        flow_id: *flow_id,
988                        task_id: *task_id,
989                        attempt_number: *attempt_number,
990                        started_at: timestamp,
991                        baseline_id: None,
992                        diff_id: None,
993                        check_results: Vec::new(),
994                        checkpoints: Vec::new(),
995                        all_checkpoints_completed: false,
996                    },
997                );
998            }
999
1000            EventPayload::CheckpointDeclared {
1001                attempt_id,
1002                checkpoint_id,
1003                order,
1004                total,
1005                ..
1006            } => {
1007                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1008                    let exists = attempt
1009                        .checkpoints
1010                        .iter()
1011                        .any(|cp| cp.checkpoint_id == *checkpoint_id);
1012                    if !exists {
1013                        attempt.checkpoints.push(AttemptCheckpoint {
1014                            checkpoint_id: checkpoint_id.clone(),
1015                            order: *order,
1016                            total: *total,
1017                            state: AttemptCheckpointState::Declared,
1018                            commit_hash: None,
1019                            completed_at: None,
1020                            summary: None,
1021                        });
1022                        attempt.checkpoints.sort_by_key(|cp| cp.order);
1023                    }
1024                }
1025            }
1026
1027            EventPayload::CheckpointActivated {
1028                attempt_id,
1029                checkpoint_id,
1030                ..
1031            } => {
1032                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1033                    for cp in &mut attempt.checkpoints {
1034                        if cp.checkpoint_id == *checkpoint_id {
1035                            cp.state = AttemptCheckpointState::Active;
1036                        } else if cp.state != AttemptCheckpointState::Completed {
1037                            cp.state = AttemptCheckpointState::Declared;
1038                        }
1039                    }
1040                }
1041            }
1042
1043            EventPayload::CheckpointCompleted {
1044                attempt_id,
1045                checkpoint_id,
1046                commit_hash,
1047                timestamp,
1048                summary,
1049                ..
1050            } => {
1051                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1052                    if let Some(cp) = attempt
1053                        .checkpoints
1054                        .iter_mut()
1055                        .find(|cp| cp.checkpoint_id == *checkpoint_id)
1056                    {
1057                        cp.state = AttemptCheckpointState::Completed;
1058                        cp.commit_hash = Some(commit_hash.clone());
1059                        cp.completed_at = Some(*timestamp);
1060                        cp.summary.clone_from(summary);
1061                    }
1062                }
1063            }
1064
1065            EventPayload::AllCheckpointsCompleted { attempt_id, .. } => {
1066                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1067                    attempt.all_checkpoints_completed = true;
1068                }
1069            }
1070
1071            EventPayload::BaselineCaptured {
1072                attempt_id,
1073                baseline_id,
1074                ..
1075            } => {
1076                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1077                    attempt.baseline_id = Some(*baseline_id);
1078                }
1079            }
1080
1081            EventPayload::DiffComputed {
1082                attempt_id,
1083                diff_id,
1084                ..
1085            } => {
1086                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1087                    attempt.diff_id = Some(*diff_id);
1088                }
1089            }
1090
1091            EventPayload::CheckCompleted {
1092                attempt_id,
1093                check_name,
1094                passed,
1095                exit_code,
1096                output,
1097                duration_ms,
1098                required,
1099                ..
1100            } => {
1101                if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1102                    attempt.check_results.push(CheckResult {
1103                        name: check_name.clone(),
1104                        passed: *passed,
1105                        exit_code: *exit_code,
1106                        output: output.clone(),
1107                        duration_ms: *duration_ms,
1108                        required: *required,
1109                    });
1110                }
1111            }
1112
1113            EventPayload::CheckStarted { .. }
1114            | EventPayload::ConstitutionValidated { .. }
1115            | EventPayload::TemplateInstantiated { .. }
1116            | EventPayload::ErrorOccurred { .. }
1117            | EventPayload::TaskExecutionStarted { .. }
1118            | EventPayload::TaskExecutionSucceeded { .. }
1119            | EventPayload::TaskExecutionFailed { .. }
1120            | EventPayload::MergeConflictDetected { .. }
1121            | EventPayload::MergeCheckStarted { .. }
1122            | EventPayload::MergeCheckCompleted { .. }
1123            | EventPayload::RuntimeStarted { .. }
1124            | EventPayload::RuntimeOutputChunk { .. }
1125            | EventPayload::RuntimeInputProvided { .. }
1126            | EventPayload::RuntimeInterrupted { .. }
1127            | EventPayload::RuntimeExited { .. }
1128            | EventPayload::RuntimeTerminated { .. }
1129            | EventPayload::RuntimeErrorClassified { .. }
1130            | EventPayload::RuntimeRecoveryScheduled { .. }
1131            | EventPayload::RuntimeFilesystemObserved { .. }
1132            | EventPayload::RuntimeCommandObserved { .. }
1133            | EventPayload::RuntimeToolCallObserved { .. }
1134            | EventPayload::RuntimeTodoSnapshotUpdated { .. }
1135            | EventPayload::RuntimeNarrativeOutputObserved { .. }
1136            | EventPayload::FileModified { .. }
1137            | EventPayload::CheckpointCommitCreated { .. }
1138            | EventPayload::ScopeValidated { .. }
1139            | EventPayload::ScopeViolationDetected { .. }
1140            | EventPayload::ScopeConflictDetected { .. }
1141            | EventPayload::TaskSchedulingDeferred { .. }
1142            | EventPayload::RetryContextAssembled { .. }
1143            | EventPayload::FlowIntegrationLockAcquired { .. }
1144            | EventPayload::WorktreeCleanupPerformed { .. }
1145            | EventPayload::Unknown => {}
1146
1147            EventPayload::TaskRetryRequested {
1148                task_id,
1149                reset_count,
1150                retry_mode,
1151            } => {
1152                let flow_id = event.metadata.correlation.flow_id;
1153                let mut candidate_flow_ids = Vec::new();
1154
1155                if let Some(fid) = flow_id {
1156                    candidate_flow_ids.push(fid);
1157                } else {
1158                    for (fid, flow) in &self.flows {
1159                        if flow.task_executions.contains_key(task_id) {
1160                            candidate_flow_ids.push(*fid);
1161                        }
1162                    }
1163                }
1164
1165                for fid in candidate_flow_ids {
1166                    if let Some(flow) = self.flows.get_mut(&fid) {
1167                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
1168                            exec.state = TaskExecState::Pending;
1169                            exec.blocked_reason = None;
1170                            exec.updated_at = timestamp;
1171                            exec.retry_mode = *retry_mode;
1172                            if *reset_count {
1173                                exec.attempt_count = 0;
1174                            }
1175                            flow.updated_at = timestamp;
1176                            break;
1177                        }
1178                    }
1179                }
1180            }
1181
1182            EventPayload::TaskAborted { task_id, reason: _ } => {
1183                let flow_id = event.metadata.correlation.flow_id;
1184                let mut candidate_flow_ids = Vec::new();
1185
1186                if let Some(fid) = flow_id {
1187                    candidate_flow_ids.push(fid);
1188                } else {
1189                    for (fid, flow) in &self.flows {
1190                        if flow.task_executions.contains_key(task_id) {
1191                            candidate_flow_ids.push(*fid);
1192                        }
1193                    }
1194                }
1195
1196                for fid in candidate_flow_ids {
1197                    if let Some(flow) = self.flows.get_mut(&fid) {
1198                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
1199                            exec.state = TaskExecState::Failed;
1200                            exec.blocked_reason = None;
1201                            exec.updated_at = timestamp;
1202                            flow.updated_at = timestamp;
1203                            break;
1204                        }
1205                    }
1206                }
1207            }
1208
1209            EventPayload::HumanOverride {
1210                task_id,
1211                override_type: _,
1212                decision,
1213                reason: _,
1214                user: _,
1215            } => {
1216                let flow_id = event.metadata.correlation.flow_id;
1217                let mut candidate_flow_ids = Vec::new();
1218
1219                if let Some(fid) = flow_id {
1220                    candidate_flow_ids.push(fid);
1221                } else {
1222                    for (fid, flow) in &self.flows {
1223                        if flow.task_executions.contains_key(task_id) {
1224                            candidate_flow_ids.push(*fid);
1225                        }
1226                    }
1227                }
1228
1229                let new_state = if decision == "pass" {
1230                    TaskExecState::Success
1231                } else {
1232                    TaskExecState::Failed
1233                };
1234
1235                for fid in candidate_flow_ids {
1236                    if let Some(flow) = self.flows.get_mut(&fid) {
1237                        if let Some(exec) = flow.task_executions.get_mut(task_id) {
1238                            exec.state = new_state;
1239                            exec.blocked_reason = None;
1240                            exec.updated_at = timestamp;
1241                            flow.updated_at = timestamp;
1242                            break;
1243                        }
1244                    }
1245                }
1246            }
1247
1248            EventPayload::MergePrepared {
1249                flow_id,
1250                target_branch,
1251                conflicts,
1252            } => {
1253                self.merge_states.insert(
1254                    *flow_id,
1255                    MergeState {
1256                        flow_id: *flow_id,
1257                        status: MergeStatus::Prepared,
1258                        target_branch: target_branch.clone(),
1259                        conflicts: conflicts.clone(),
1260                        commits: Vec::new(),
1261                        updated_at: timestamp,
1262                    },
1263                );
1264            }
1265            EventPayload::MergeApproved { flow_id, user: _ } => {
1266                if let Some(ms) = self.merge_states.get_mut(flow_id) {
1267                    ms.status = MergeStatus::Approved;
1268                    ms.updated_at = timestamp;
1269                }
1270            }
1271            EventPayload::MergeCompleted { flow_id, commits } => {
1272                if let Some(ms) = self.merge_states.get_mut(flow_id) {
1273                    ms.status = MergeStatus::Completed;
1274                    commits.clone_into(&mut ms.commits);
1275                    ms.updated_at = timestamp;
1276                }
1277
1278                if let Some(flow) = self.flows.get_mut(flow_id) {
1279                    flow.state = FlowState::Merged;
1280                    flow.completed_at = Some(timestamp);
1281                    flow.updated_at = timestamp;
1282                }
1283            }
1284        }
1285    }
1286
1287    /// Replays a sequence of events to produce state.
1288    /// Deterministic: same events → same state.
1289    #[must_use]
1290    pub fn replay(events: &[Event]) -> Self {
1291        let mut state = Self::new();
1292        for event in events {
1293            state.apply_mut(event);
1294        }
1295        state
1296    }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302    use crate::core::events::CorrelationIds;
1303
1304    #[test]
1305    fn replay_is_deterministic() {
1306        let project_id = Uuid::new_v4();
1307        let events = vec![
1308            Event::new(
1309                EventPayload::ProjectCreated {
1310                    id: project_id,
1311                    name: "test".to_string(),
1312                    description: None,
1313                },
1314                CorrelationIds::for_project(project_id),
1315            ),
1316            Event::new(
1317                EventPayload::ProjectUpdated {
1318                    id: project_id,
1319                    name: Some("updated".to_string()),
1320                    description: None,
1321                },
1322                CorrelationIds::for_project(project_id),
1323            ),
1324        ];
1325
1326        let state1 = AppState::replay(&events);
1327        let state2 = AppState::replay(&events);
1328
1329        assert_eq!(state1.projects.len(), state2.projects.len());
1330        assert_eq!(
1331            state1.projects.get(&project_id).unwrap().name,
1332            state2.projects.get(&project_id).unwrap().name
1333        );
1334    }
1335
1336    #[test]
1337    fn replay_is_idempotent() {
1338        let project_id = Uuid::new_v4();
1339        let events = vec![Event::new(
1340            EventPayload::ProjectCreated {
1341                id: project_id,
1342                name: "test".to_string(),
1343                description: None,
1344            },
1345            CorrelationIds::for_project(project_id),
1346        )];
1347
1348        let state1 = AppState::replay(&events);
1349        let state2 = AppState::replay(&events);
1350
1351        assert_eq!(state1.projects.len(), 1);
1352        assert_eq!(state2.projects.len(), 1);
1353    }
1354
1355    #[test]
1356    fn task_lifecycle() {
1357        let project_id = Uuid::new_v4();
1358        let task_id = Uuid::new_v4();
1359
1360        let events = vec![
1361            Event::new(
1362                EventPayload::ProjectCreated {
1363                    id: project_id,
1364                    name: "proj".to_string(),
1365                    description: None,
1366                },
1367                CorrelationIds::for_project(project_id),
1368            ),
1369            Event::new(
1370                EventPayload::TaskCreated {
1371                    id: task_id,
1372                    project_id,
1373                    title: "task1".to_string(),
1374                    description: None,
1375                    scope: None,
1376                },
1377                CorrelationIds::for_task(project_id, task_id),
1378            ),
1379            Event::new(
1380                EventPayload::TaskClosed {
1381                    id: task_id,
1382                    reason: None,
1383                },
1384                CorrelationIds::for_task(project_id, task_id),
1385            ),
1386        ];
1387
1388        let state = AppState::replay(&events);
1389        let task = state.tasks.get(&task_id).unwrap();
1390
1391        assert_eq!(task.state, TaskState::Closed);
1392    }
1393}