1use super::events::{Event, EventPayload, RuntimeRole};
6use super::flow::{FlowState, RetryMode, RunMode, TaskExecState, TaskExecution, TaskFlow};
7use super::graph::{GraphState, TaskGraph};
8use super::scope::{RepoAccessMode, Scope};
9use super::verification::CheckResult;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use uuid::Uuid;
14
15const fn default_max_parallel_tasks() -> u16 {
16 1
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum AttemptCheckpointState {
22 Declared,
23 Active,
24 Completed,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub struct AttemptCheckpoint {
29 pub checkpoint_id: String,
30 pub order: u32,
31 pub total: u32,
32 pub state: AttemptCheckpointState,
33 #[serde(default)]
34 pub commit_hash: Option<String>,
35 #[serde(default)]
36 pub completed_at: Option<DateTime<Utc>>,
37 #[serde(default)]
38 pub summary: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct AttemptState {
43 pub id: Uuid,
44 pub flow_id: Uuid,
45 pub task_id: Uuid,
46 pub attempt_number: u32,
47 pub started_at: DateTime<Utc>,
48 pub baseline_id: Option<Uuid>,
49 pub diff_id: Option<Uuid>,
50 #[serde(default)]
51 pub check_results: Vec<CheckResult>,
52 #[serde(default)]
53 pub checkpoints: Vec<AttemptCheckpoint>,
54 #[serde(default)]
55 pub all_checkpoints_completed: bool,
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
59pub struct ProjectRuntimeConfig {
60 pub adapter_name: String,
61 pub binary_path: String,
62 #[serde(default)]
63 pub model: Option<String>,
64 #[serde(default)]
65 pub args: Vec<String>,
66 #[serde(default)]
67 pub env: HashMap<String, String>,
68 pub timeout_ms: u64,
69 #[serde(default = "default_max_parallel_tasks")]
70 pub max_parallel_tasks: u16,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct TaskRuntimeConfig {
75 pub adapter_name: String,
76 pub binary_path: String,
77 #[serde(default)]
78 pub model: Option<String>,
79 #[serde(default)]
80 pub args: Vec<String>,
81 #[serde(default)]
82 pub env: HashMap<String, String>,
83 pub timeout_ms: u64,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
87pub struct RuntimeRoleDefaults {
88 #[serde(default)]
89 pub worker: Option<ProjectRuntimeConfig>,
90 #[serde(default)]
91 pub validator: Option<ProjectRuntimeConfig>,
92}
93
94impl RuntimeRoleDefaults {
95 pub fn set(&mut self, role: RuntimeRole, config: Option<ProjectRuntimeConfig>) {
96 match role {
97 RuntimeRole::Worker => self.worker = config,
98 RuntimeRole::Validator => self.validator = config,
99 }
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub struct GovernanceProjectStorage {
105 pub project_id: Uuid,
106 pub schema_version: String,
107 pub projection_version: u32,
108 pub root_path: String,
109 pub initialized_at: DateTime<Utc>,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct GovernanceArtifact {
114 #[serde(default)]
115 pub project_id: Option<Uuid>,
116 pub scope: String,
117 pub artifact_kind: String,
118 pub artifact_key: String,
119 pub path: String,
120 #[serde(default)]
121 pub revision: u64,
122 pub schema_version: String,
123 pub projection_version: u32,
124 pub updated_at: DateTime<Utc>,
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
128pub struct GovernanceAttachment {
129 pub project_id: Uuid,
130 pub task_id: Uuid,
131 pub artifact_kind: String,
132 pub artifact_key: String,
133 pub attached: bool,
134 pub schema_version: String,
135 pub projection_version: u32,
136 pub updated_at: DateTime<Utc>,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct GovernanceMigration {
141 #[serde(default)]
142 pub project_id: Option<Uuid>,
143 pub from_layout: String,
144 pub to_layout: String,
145 #[serde(default)]
146 pub migrated_paths: Vec<String>,
147 pub rollback_hint: String,
148 pub schema_version: String,
149 pub projection_version: u32,
150 pub migrated_at: DateTime<Utc>,
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
154pub struct TaskRuntimeRoleOverrides {
155 #[serde(default)]
156 pub worker: Option<TaskRuntimeConfig>,
157 #[serde(default)]
158 pub validator: Option<TaskRuntimeConfig>,
159}
160
161impl TaskRuntimeRoleOverrides {
162 pub fn set(&mut self, role: RuntimeRole, config: Option<TaskRuntimeConfig>) {
163 match role {
164 RuntimeRole::Worker => self.worker = config,
165 RuntimeRole::Validator => self.validator = config,
166 }
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
172pub struct Project {
173 pub id: Uuid,
174 pub name: String,
175 pub description: Option<String>,
176 pub created_at: DateTime<Utc>,
177 pub updated_at: DateTime<Utc>,
178 pub repositories: Vec<Repository>,
179 #[serde(default)]
180 pub runtime: Option<ProjectRuntimeConfig>,
181 #[serde(default)]
182 pub runtime_defaults: RuntimeRoleDefaults,
183 #[serde(default)]
184 pub constitution_digest: Option<String>,
185 #[serde(default)]
186 pub constitution_schema_version: Option<String>,
187 #[serde(default)]
188 pub constitution_version: Option<u32>,
189 #[serde(default)]
190 pub constitution_updated_at: Option<DateTime<Utc>>,
191}
192
193#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195pub struct Repository {
196 pub name: String,
197 pub path: String,
198 #[serde(default)]
199 pub access_mode: RepoAccessMode,
200}
201
202#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
204#[serde(rename_all = "lowercase")]
205pub enum TaskState {
206 Open,
207 Closed,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
212pub struct Task {
213 pub id: Uuid,
214 pub project_id: Uuid,
215 pub title: String,
216 pub description: Option<String>,
217 #[serde(default)]
218 pub scope: Option<Scope>,
219 #[serde(default)]
220 pub runtime_override: Option<TaskRuntimeConfig>,
221 #[serde(default)]
222 pub runtime_overrides: TaskRuntimeRoleOverrides,
223 #[serde(default)]
224 pub run_mode: RunMode,
225 pub state: TaskState,
226 pub created_at: DateTime<Utc>,
227 pub updated_at: DateTime<Utc>,
228}
229
230#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(rename_all = "lowercase")]
233pub enum MergeStatus {
234 Prepared,
235 Approved,
236 Completed,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct MergeState {
242 pub flow_id: Uuid,
243 pub status: MergeStatus,
244 pub target_branch: Option<String>,
245 pub conflicts: Vec<String>,
246 pub commits: Vec<String>,
247 pub updated_at: DateTime<Utc>,
248}
249
250#[derive(Debug, Default, Clone)]
252pub struct AppState {
253 pub projects: HashMap<Uuid, Project>,
254 pub governance_projects: HashMap<Uuid, GovernanceProjectStorage>,
255 pub governance_artifacts: HashMap<String, GovernanceArtifact>,
256 pub governance_attachments: HashMap<String, GovernanceAttachment>,
257 pub governance_migrations: Vec<GovernanceMigration>,
258 pub tasks: HashMap<Uuid, Task>,
259 pub graphs: HashMap<Uuid, TaskGraph>,
260 pub flows: HashMap<Uuid, TaskFlow>,
261 pub global_runtime_defaults: RuntimeRoleDefaults,
262 pub flow_runtime_defaults: HashMap<Uuid, RuntimeRoleDefaults>,
263 pub merge_states: HashMap<Uuid, MergeState>,
264 pub attempts: HashMap<Uuid, AttemptState>,
265}
266
267impl AppState {
268 #[must_use]
270 pub fn new() -> Self {
271 Self::default()
272 }
273
274 #[must_use]
276 pub fn apply(mut self, event: &Event) -> Self {
277 self.apply_mut(event);
278 self
279 }
280
281 #[allow(clippy::too_many_lines)]
283 pub fn apply_mut(&mut self, event: &Event) {
284 let timestamp = event.timestamp();
285
286 match &event.payload {
287 EventPayload::ProjectCreated {
288 id,
289 name,
290 description,
291 } => {
292 self.projects.insert(
293 *id,
294 Project {
295 id: *id,
296 name: name.clone(),
297 description: description.clone(),
298 created_at: timestamp,
299 updated_at: timestamp,
300 repositories: Vec::new(),
301 runtime: None,
302 runtime_defaults: RuntimeRoleDefaults::default(),
303 constitution_digest: None,
304 constitution_schema_version: None,
305 constitution_version: None,
306 constitution_updated_at: None,
307 },
308 );
309 }
310 EventPayload::ProjectUpdated {
311 id,
312 name,
313 description,
314 } => {
315 if let Some(project) = self.projects.get_mut(id) {
316 if let Some(n) = name {
317 n.clone_into(&mut project.name);
318 }
319 if let Some(d) = description {
320 project.description = Some(d.clone());
321 }
322 project.updated_at = timestamp;
323 }
324 }
325
326 EventPayload::TaskExecutionFrozen {
327 flow_id,
328 task_id,
329 commit_sha,
330 } => {
331 if let Some(flow) = self.flows.get_mut(flow_id) {
332 if let Some(exec) = flow.task_executions.get_mut(task_id) {
333 exec.frozen_commit_sha.clone_from(commit_sha);
334 exec.updated_at = timestamp;
335 flow.updated_at = timestamp;
336 }
337 }
338 }
339
340 EventPayload::TaskIntegratedIntoFlow {
341 flow_id,
342 task_id,
343 commit_sha,
344 } => {
345 if let Some(flow) = self.flows.get_mut(flow_id) {
346 if let Some(exec) = flow.task_executions.get_mut(task_id) {
347 exec.integrated_commit_sha.clone_from(commit_sha);
348 exec.updated_at = timestamp;
349 flow.updated_at = timestamp;
350 }
351 }
352 }
353 EventPayload::ProjectRuntimeConfigured {
354 project_id,
355 adapter_name,
356 binary_path,
357 model,
358 args,
359 env,
360 timeout_ms,
361 max_parallel_tasks,
362 } => {
363 if let Some(project) = self.projects.get_mut(project_id) {
364 let configured = ProjectRuntimeConfig {
365 adapter_name: adapter_name.clone(),
366 binary_path: binary_path.clone(),
367 model: model.clone(),
368 args: args.clone(),
369 env: env.clone(),
370 timeout_ms: *timeout_ms,
371 max_parallel_tasks: *max_parallel_tasks,
372 };
373 project.runtime = Some(configured.clone());
374 project.runtime_defaults.worker = Some(configured);
375 project.updated_at = timestamp;
376 }
377 }
378 EventPayload::ProjectRuntimeRoleConfigured {
379 project_id,
380 role,
381 adapter_name,
382 binary_path,
383 model,
384 args,
385 env,
386 timeout_ms,
387 max_parallel_tasks,
388 } => {
389 if let Some(project) = self.projects.get_mut(project_id) {
390 let configured = ProjectRuntimeConfig {
391 adapter_name: adapter_name.clone(),
392 binary_path: binary_path.clone(),
393 model: model.clone(),
394 args: args.clone(),
395 env: env.clone(),
396 timeout_ms: *timeout_ms,
397 max_parallel_tasks: *max_parallel_tasks,
398 };
399 project
400 .runtime_defaults
401 .set(*role, Some(configured.clone()));
402 if *role == RuntimeRole::Worker {
403 project.runtime = Some(configured);
404 }
405 project.updated_at = timestamp;
406 }
407 }
408 EventPayload::GlobalRuntimeConfigured {
409 role,
410 adapter_name,
411 binary_path,
412 model,
413 args,
414 env,
415 timeout_ms,
416 max_parallel_tasks,
417 } => {
418 let configured = ProjectRuntimeConfig {
419 adapter_name: adapter_name.clone(),
420 binary_path: binary_path.clone(),
421 model: model.clone(),
422 args: args.clone(),
423 env: env.clone(),
424 timeout_ms: *timeout_ms,
425 max_parallel_tasks: *max_parallel_tasks,
426 };
427 self.global_runtime_defaults.set(*role, Some(configured));
428 }
429 EventPayload::TaskCreated {
430 id,
431 project_id,
432 title,
433 description,
434 scope,
435 } => {
436 self.tasks.insert(
437 *id,
438 Task {
439 id: *id,
440 project_id: *project_id,
441 title: title.clone(),
442 description: description.clone(),
443 scope: scope.clone(),
444 runtime_override: None,
445 runtime_overrides: TaskRuntimeRoleOverrides::default(),
446 run_mode: RunMode::Auto,
447 state: TaskState::Open,
448 created_at: timestamp,
449 updated_at: timestamp,
450 },
451 );
452 }
453 EventPayload::TaskUpdated {
454 id,
455 title,
456 description,
457 } => {
458 if let Some(task) = self.tasks.get_mut(id) {
459 if let Some(t) = title {
460 t.clone_into(&mut task.title);
461 }
462 if let Some(d) = description {
463 task.description = Some(d.clone());
464 }
465 task.updated_at = timestamp;
466 }
467 }
468 EventPayload::TaskRuntimeConfigured {
469 task_id,
470 adapter_name,
471 binary_path,
472 model,
473 args,
474 env,
475 timeout_ms,
476 } => {
477 if let Some(task) = self.tasks.get_mut(task_id) {
478 let configured = TaskRuntimeConfig {
479 adapter_name: adapter_name.clone(),
480 binary_path: binary_path.clone(),
481 model: model.clone(),
482 args: args.clone(),
483 env: env.clone(),
484 timeout_ms: *timeout_ms,
485 };
486 task.runtime_override = Some(configured.clone());
487 task.runtime_overrides.worker = Some(configured);
488 task.updated_at = timestamp;
489 }
490 }
491 EventPayload::TaskRuntimeRoleConfigured {
492 task_id,
493 role,
494 adapter_name,
495 binary_path,
496 model,
497 args,
498 env,
499 timeout_ms,
500 } => {
501 if let Some(task) = self.tasks.get_mut(task_id) {
502 let configured = TaskRuntimeConfig {
503 adapter_name: adapter_name.clone(),
504 binary_path: binary_path.clone(),
505 model: model.clone(),
506 args: args.clone(),
507 env: env.clone(),
508 timeout_ms: *timeout_ms,
509 };
510 task.runtime_overrides.set(*role, Some(configured.clone()));
511 if *role == RuntimeRole::Worker {
512 task.runtime_override = Some(configured);
513 }
514 task.updated_at = timestamp;
515 }
516 }
517 EventPayload::TaskRuntimeCleared { task_id } => {
518 if let Some(task) = self.tasks.get_mut(task_id) {
519 task.runtime_override = None;
520 task.runtime_overrides.worker = None;
521 task.updated_at = timestamp;
522 }
523 }
524 EventPayload::TaskRuntimeRoleCleared { task_id, role } => {
525 if let Some(task) = self.tasks.get_mut(task_id) {
526 task.runtime_overrides.set(*role, None);
527 if *role == RuntimeRole::Worker {
528 task.runtime_override = None;
529 }
530 task.updated_at = timestamp;
531 }
532 }
533 EventPayload::TaskRunModeSet { task_id, mode } => {
534 if let Some(task) = self.tasks.get_mut(task_id) {
535 task.run_mode = *mode;
536 task.updated_at = timestamp;
537 }
538 }
539 EventPayload::TaskClosed { id, reason: _ } => {
540 if let Some(task) = self.tasks.get_mut(id) {
541 task.state = TaskState::Closed;
542 task.updated_at = timestamp;
543 }
544 }
545 EventPayload::RepositoryAttached {
546 project_id,
547 path,
548 name,
549 access_mode,
550 } => {
551 if let Some(project) = self.projects.get_mut(project_id) {
552 project.repositories.push(Repository {
553 name: name.clone(),
554 path: path.clone(),
555 access_mode: *access_mode,
556 });
557 project.updated_at = timestamp;
558 }
559 }
560 EventPayload::RepositoryDetached { project_id, name } => {
561 if let Some(project) = self.projects.get_mut(project_id) {
562 project.repositories.retain(|r| r.name != *name);
563 project.updated_at = timestamp;
564 }
565 }
566
567 EventPayload::GovernanceProjectStorageInitialized {
568 project_id,
569 schema_version,
570 projection_version,
571 root_path,
572 } => {
573 self.governance_projects.insert(
574 *project_id,
575 GovernanceProjectStorage {
576 project_id: *project_id,
577 schema_version: schema_version.clone(),
578 projection_version: *projection_version,
579 root_path: root_path.clone(),
580 initialized_at: timestamp,
581 },
582 );
583 }
584 EventPayload::GovernanceArtifactUpserted {
585 project_id,
586 scope,
587 artifact_kind,
588 artifact_key,
589 path,
590 revision,
591 schema_version,
592 projection_version,
593 } => {
594 let project_key =
595 project_id.map_or_else(|| "global".to_string(), |id| id.to_string());
596 let key = format!("{project_key}::{scope}::{artifact_kind}::{artifact_key}");
597 self.governance_artifacts.insert(
598 key,
599 GovernanceArtifact {
600 project_id: *project_id,
601 scope: scope.clone(),
602 artifact_kind: artifact_kind.clone(),
603 artifact_key: artifact_key.clone(),
604 path: path.clone(),
605 revision: *revision,
606 schema_version: schema_version.clone(),
607 projection_version: *projection_version,
608 updated_at: timestamp,
609 },
610 );
611 }
612 EventPayload::GovernanceArtifactDeleted {
613 project_id,
614 scope,
615 artifact_kind,
616 artifact_key,
617 path: _,
618 schema_version: _,
619 projection_version: _,
620 } => {
621 let project_key =
622 project_id.map_or_else(|| "global".to_string(), |id| id.to_string());
623 let key = format!("{project_key}::{scope}::{artifact_kind}::{artifact_key}");
624 self.governance_artifacts.remove(&key);
625 }
626 EventPayload::GovernanceAttachmentLifecycleUpdated {
627 project_id,
628 task_id,
629 artifact_kind,
630 artifact_key,
631 attached,
632 schema_version,
633 projection_version,
634 } => {
635 let key = format!("{project_id}::{task_id}::{artifact_kind}::{artifact_key}");
636 self.governance_attachments.insert(
637 key,
638 GovernanceAttachment {
639 project_id: *project_id,
640 task_id: *task_id,
641 artifact_kind: artifact_kind.clone(),
642 artifact_key: artifact_key.clone(),
643 attached: *attached,
644 schema_version: schema_version.clone(),
645 projection_version: *projection_version,
646 updated_at: timestamp,
647 },
648 );
649 }
650 EventPayload::GovernanceStorageMigrated {
651 project_id,
652 from_layout,
653 to_layout,
654 migrated_paths,
655 rollback_hint,
656 schema_version,
657 projection_version,
658 } => {
659 self.governance_migrations.push(GovernanceMigration {
660 project_id: *project_id,
661 from_layout: from_layout.clone(),
662 to_layout: to_layout.clone(),
663 migrated_paths: migrated_paths.clone(),
664 rollback_hint: rollback_hint.clone(),
665 schema_version: schema_version.clone(),
666 projection_version: *projection_version,
667 migrated_at: timestamp,
668 });
669 }
670 EventPayload::ConstitutionInitialized {
671 project_id,
672 schema_version,
673 constitution_version,
674 digest,
675 ..
676 }
677 | EventPayload::ConstitutionUpdated {
678 project_id,
679 schema_version,
680 constitution_version,
681 digest,
682 ..
683 } => {
684 if let Some(project) = self.projects.get_mut(project_id) {
685 project.constitution_digest = Some(digest.clone());
686 project.constitution_schema_version = Some(schema_version.clone());
687 project.constitution_version = Some(*constitution_version);
688 project.constitution_updated_at = Some(timestamp);
689 project.updated_at = timestamp;
690 }
691 }
692 EventPayload::TaskGraphCreated {
693 graph_id,
694 project_id,
695 name,
696 description,
697 } => {
698 self.graphs.insert(
699 *graph_id,
700 TaskGraph {
701 id: *graph_id,
702 project_id: *project_id,
703 name: name.clone(),
704 description: description.clone(),
705 state: GraphState::Draft,
706 tasks: HashMap::new(),
707 dependencies: HashMap::<Uuid, HashSet<Uuid>>::new(),
708 created_at: timestamp,
709 updated_at: timestamp,
710 },
711 );
712 }
713 EventPayload::TaskAddedToGraph { graph_id, task } => {
714 if let Some(graph) = self.graphs.get_mut(graph_id) {
715 graph.tasks.insert(task.id, task.clone());
716 graph.dependencies.entry(task.id).or_default();
717 graph.updated_at = timestamp;
718 }
719 }
720 EventPayload::DependencyAdded {
721 graph_id,
722 from_task,
723 to_task,
724 } => {
725 if let Some(graph) = self.graphs.get_mut(graph_id) {
726 graph
727 .dependencies
728 .entry(*to_task)
729 .or_default()
730 .insert(*from_task);
731 graph.updated_at = timestamp;
732 }
733 }
734 EventPayload::GraphTaskCheckAdded {
735 graph_id,
736 task_id,
737 check,
738 } => {
739 if let Some(graph) = self.graphs.get_mut(graph_id) {
740 if let Some(task) = graph.tasks.get_mut(task_id) {
741 task.criteria.checks.push(check.clone());
742 graph.updated_at = timestamp;
743 }
744 }
745 }
746 EventPayload::ScopeAssigned {
747 graph_id,
748 task_id,
749 scope,
750 } => {
751 if let Some(graph) = self.graphs.get_mut(graph_id) {
752 if let Some(task) = graph.tasks.get_mut(task_id) {
753 task.scope = Some(scope.clone());
754 graph.updated_at = timestamp;
755 }
756 }
757 }
758
759 EventPayload::TaskGraphValidated {
760 graph_id,
761 project_id: _,
762 valid,
763 issues: _,
764 } => {
765 if *valid {
766 if let Some(graph) = self.graphs.get_mut(graph_id) {
767 graph.state = GraphState::Validated;
768 graph.updated_at = timestamp;
769 }
770 }
771 }
772
773 EventPayload::TaskGraphLocked {
774 graph_id,
775 project_id: _,
776 } => {
777 if let Some(graph) = self.graphs.get_mut(graph_id) {
778 graph.state = GraphState::Locked;
779 graph.updated_at = timestamp;
780 }
781 }
782 EventPayload::TaskFlowCreated {
783 flow_id,
784 graph_id,
785 project_id,
786 name: _,
787 task_ids,
788 } => {
789 if let Some(graph) = self.graphs.get_mut(graph_id) {
790 graph.state = GraphState::Locked;
791 graph.updated_at = timestamp;
792 }
793
794 let mut task_executions = HashMap::new();
795 for task_id in task_ids {
796 task_executions.insert(
797 *task_id,
798 TaskExecution {
799 task_id: *task_id,
800 state: TaskExecState::Pending,
801 attempt_count: 0,
802 retry_mode: RetryMode::default(),
803 frozen_commit_sha: None,
804 integrated_commit_sha: None,
805 updated_at: timestamp,
806 blocked_reason: None,
807 },
808 );
809 }
810
811 self.flows.insert(
812 *flow_id,
813 TaskFlow {
814 id: *flow_id,
815 graph_id: *graph_id,
816 project_id: *project_id,
817 base_revision: None,
818 run_mode: RunMode::Manual,
819 depends_on_flows: HashSet::new(),
820 state: FlowState::Created,
821 task_executions,
822 created_at: timestamp,
823 started_at: None,
824 completed_at: None,
825 updated_at: timestamp,
826 },
827 );
828 self.flow_runtime_defaults.entry(*flow_id).or_default();
829 }
830 EventPayload::TaskFlowDependencyAdded {
831 flow_id,
832 depends_on_flow_id,
833 } => {
834 if let Some(flow) = self.flows.get_mut(flow_id) {
835 flow.depends_on_flows.insert(*depends_on_flow_id);
836 flow.updated_at = timestamp;
837 }
838 }
839 EventPayload::TaskFlowRunModeSet { flow_id, mode } => {
840 if let Some(flow) = self.flows.get_mut(flow_id) {
841 flow.run_mode = *mode;
842 flow.updated_at = timestamp;
843 }
844 }
845 EventPayload::TaskFlowRuntimeConfigured {
846 flow_id,
847 role,
848 adapter_name,
849 binary_path,
850 model,
851 args,
852 env,
853 timeout_ms,
854 max_parallel_tasks,
855 } => {
856 let configured = ProjectRuntimeConfig {
857 adapter_name: adapter_name.clone(),
858 binary_path: binary_path.clone(),
859 model: model.clone(),
860 args: args.clone(),
861 env: env.clone(),
862 timeout_ms: *timeout_ms,
863 max_parallel_tasks: *max_parallel_tasks,
864 };
865 self.flow_runtime_defaults
866 .entry(*flow_id)
867 .or_default()
868 .set(*role, Some(configured));
869 if let Some(flow) = self.flows.get_mut(flow_id) {
870 flow.updated_at = timestamp;
871 }
872 }
873 EventPayload::TaskFlowRuntimeCleared { flow_id, role } => {
874 self.flow_runtime_defaults
875 .entry(*flow_id)
876 .or_default()
877 .set(*role, None);
878 if let Some(flow) = self.flows.get_mut(flow_id) {
879 flow.updated_at = timestamp;
880 }
881 }
882 EventPayload::TaskFlowStarted {
883 flow_id,
884 base_revision,
885 } => {
886 if let Some(flow) = self.flows.get_mut(flow_id) {
887 flow.state = FlowState::Running;
888 flow.started_at = Some(timestamp);
889 flow.base_revision.clone_from(base_revision);
890 flow.updated_at = timestamp;
891 }
892 }
893 EventPayload::TaskFlowPaused {
894 flow_id,
895 running_tasks: _,
896 } => {
897 if let Some(flow) = self.flows.get_mut(flow_id) {
898 flow.state = FlowState::Paused;
899 flow.updated_at = timestamp;
900 }
901 }
902 EventPayload::TaskFlowResumed { flow_id } => {
903 if let Some(flow) = self.flows.get_mut(flow_id) {
904 flow.state = FlowState::Running;
905 flow.updated_at = timestamp;
906 }
907 }
908 EventPayload::TaskFlowCompleted { flow_id } => {
909 if let Some(flow) = self.flows.get_mut(flow_id) {
910 flow.state = FlowState::Completed;
911 flow.completed_at = Some(timestamp);
912 flow.updated_at = timestamp;
913 }
914 }
915
916 EventPayload::FlowFrozenForMerge { flow_id } => {
917 if let Some(flow) = self.flows.get_mut(flow_id) {
918 flow.state = FlowState::FrozenForMerge;
919 flow.updated_at = timestamp;
920 }
921 }
922 EventPayload::TaskFlowAborted {
923 flow_id,
924 reason: _,
925 forced: _,
926 } => {
927 if let Some(flow) = self.flows.get_mut(flow_id) {
928 flow.state = FlowState::Aborted;
929 flow.completed_at = Some(timestamp);
930 flow.updated_at = timestamp;
931 }
932 }
933 EventPayload::TaskReady { flow_id, task_id } => {
934 if let Some(flow) = self.flows.get_mut(flow_id) {
935 if let Some(exec) = flow.task_executions.get_mut(task_id) {
936 exec.state = TaskExecState::Ready;
937 exec.blocked_reason = None;
938 exec.updated_at = timestamp;
939 }
940 flow.updated_at = timestamp;
941 }
942 }
943 EventPayload::TaskBlocked {
944 flow_id,
945 task_id,
946 reason,
947 } => {
948 if let Some(flow) = self.flows.get_mut(flow_id) {
949 if let Some(exec) = flow.task_executions.get_mut(task_id) {
950 exec.state = TaskExecState::Pending;
951 exec.blocked_reason.clone_from(reason);
952 exec.updated_at = timestamp;
953 }
954 flow.updated_at = timestamp;
955 }
956 }
957 EventPayload::TaskExecutionStateChanged {
958 flow_id,
959 task_id,
960 attempt_id: _,
961 from: _,
962 to,
963 } => {
964 if let Some(flow) = self.flows.get_mut(flow_id) {
965 if let Some(exec) = flow.task_executions.get_mut(task_id) {
966 exec.state = *to;
967 exec.updated_at = timestamp;
968 exec.blocked_reason = None;
969 if *to == TaskExecState::Running {
970 exec.attempt_count += 1;
971 }
972 }
973 flow.updated_at = timestamp;
974 }
975 }
976
977 EventPayload::AttemptStarted {
978 flow_id,
979 task_id,
980 attempt_id,
981 attempt_number,
982 } => {
983 self.attempts.insert(
984 *attempt_id,
985 AttemptState {
986 id: *attempt_id,
987 flow_id: *flow_id,
988 task_id: *task_id,
989 attempt_number: *attempt_number,
990 started_at: timestamp,
991 baseline_id: None,
992 diff_id: None,
993 check_results: Vec::new(),
994 checkpoints: Vec::new(),
995 all_checkpoints_completed: false,
996 },
997 );
998 }
999
1000 EventPayload::CheckpointDeclared {
1001 attempt_id,
1002 checkpoint_id,
1003 order,
1004 total,
1005 ..
1006 } => {
1007 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1008 let exists = attempt
1009 .checkpoints
1010 .iter()
1011 .any(|cp| cp.checkpoint_id == *checkpoint_id);
1012 if !exists {
1013 attempt.checkpoints.push(AttemptCheckpoint {
1014 checkpoint_id: checkpoint_id.clone(),
1015 order: *order,
1016 total: *total,
1017 state: AttemptCheckpointState::Declared,
1018 commit_hash: None,
1019 completed_at: None,
1020 summary: None,
1021 });
1022 attempt.checkpoints.sort_by_key(|cp| cp.order);
1023 }
1024 }
1025 }
1026
1027 EventPayload::CheckpointActivated {
1028 attempt_id,
1029 checkpoint_id,
1030 ..
1031 } => {
1032 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1033 for cp in &mut attempt.checkpoints {
1034 if cp.checkpoint_id == *checkpoint_id {
1035 cp.state = AttemptCheckpointState::Active;
1036 } else if cp.state != AttemptCheckpointState::Completed {
1037 cp.state = AttemptCheckpointState::Declared;
1038 }
1039 }
1040 }
1041 }
1042
1043 EventPayload::CheckpointCompleted {
1044 attempt_id,
1045 checkpoint_id,
1046 commit_hash,
1047 timestamp,
1048 summary,
1049 ..
1050 } => {
1051 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1052 if let Some(cp) = attempt
1053 .checkpoints
1054 .iter_mut()
1055 .find(|cp| cp.checkpoint_id == *checkpoint_id)
1056 {
1057 cp.state = AttemptCheckpointState::Completed;
1058 cp.commit_hash = Some(commit_hash.clone());
1059 cp.completed_at = Some(*timestamp);
1060 cp.summary.clone_from(summary);
1061 }
1062 }
1063 }
1064
1065 EventPayload::AllCheckpointsCompleted { attempt_id, .. } => {
1066 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1067 attempt.all_checkpoints_completed = true;
1068 }
1069 }
1070
1071 EventPayload::BaselineCaptured {
1072 attempt_id,
1073 baseline_id,
1074 ..
1075 } => {
1076 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1077 attempt.baseline_id = Some(*baseline_id);
1078 }
1079 }
1080
1081 EventPayload::DiffComputed {
1082 attempt_id,
1083 diff_id,
1084 ..
1085 } => {
1086 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1087 attempt.diff_id = Some(*diff_id);
1088 }
1089 }
1090
1091 EventPayload::CheckCompleted {
1092 attempt_id,
1093 check_name,
1094 passed,
1095 exit_code,
1096 output,
1097 duration_ms,
1098 required,
1099 ..
1100 } => {
1101 if let Some(attempt) = self.attempts.get_mut(attempt_id) {
1102 attempt.check_results.push(CheckResult {
1103 name: check_name.clone(),
1104 passed: *passed,
1105 exit_code: *exit_code,
1106 output: output.clone(),
1107 duration_ms: *duration_ms,
1108 required: *required,
1109 });
1110 }
1111 }
1112
1113 EventPayload::CheckStarted { .. }
1114 | EventPayload::ConstitutionValidated { .. }
1115 | EventPayload::TemplateInstantiated { .. }
1116 | EventPayload::ErrorOccurred { .. }
1117 | EventPayload::TaskExecutionStarted { .. }
1118 | EventPayload::TaskExecutionSucceeded { .. }
1119 | EventPayload::TaskExecutionFailed { .. }
1120 | EventPayload::MergeConflictDetected { .. }
1121 | EventPayload::MergeCheckStarted { .. }
1122 | EventPayload::MergeCheckCompleted { .. }
1123 | EventPayload::RuntimeStarted { .. }
1124 | EventPayload::RuntimeOutputChunk { .. }
1125 | EventPayload::RuntimeInputProvided { .. }
1126 | EventPayload::RuntimeInterrupted { .. }
1127 | EventPayload::RuntimeExited { .. }
1128 | EventPayload::RuntimeTerminated { .. }
1129 | EventPayload::RuntimeErrorClassified { .. }
1130 | EventPayload::RuntimeRecoveryScheduled { .. }
1131 | EventPayload::RuntimeFilesystemObserved { .. }
1132 | EventPayload::RuntimeCommandObserved { .. }
1133 | EventPayload::RuntimeToolCallObserved { .. }
1134 | EventPayload::RuntimeTodoSnapshotUpdated { .. }
1135 | EventPayload::RuntimeNarrativeOutputObserved { .. }
1136 | EventPayload::FileModified { .. }
1137 | EventPayload::CheckpointCommitCreated { .. }
1138 | EventPayload::ScopeValidated { .. }
1139 | EventPayload::ScopeViolationDetected { .. }
1140 | EventPayload::ScopeConflictDetected { .. }
1141 | EventPayload::TaskSchedulingDeferred { .. }
1142 | EventPayload::RetryContextAssembled { .. }
1143 | EventPayload::FlowIntegrationLockAcquired { .. }
1144 | EventPayload::WorktreeCleanupPerformed { .. }
1145 | EventPayload::Unknown => {}
1146
1147 EventPayload::TaskRetryRequested {
1148 task_id,
1149 reset_count,
1150 retry_mode,
1151 } => {
1152 let flow_id = event.metadata.correlation.flow_id;
1153 let mut candidate_flow_ids = Vec::new();
1154
1155 if let Some(fid) = flow_id {
1156 candidate_flow_ids.push(fid);
1157 } else {
1158 for (fid, flow) in &self.flows {
1159 if flow.task_executions.contains_key(task_id) {
1160 candidate_flow_ids.push(*fid);
1161 }
1162 }
1163 }
1164
1165 for fid in candidate_flow_ids {
1166 if let Some(flow) = self.flows.get_mut(&fid) {
1167 if let Some(exec) = flow.task_executions.get_mut(task_id) {
1168 exec.state = TaskExecState::Pending;
1169 exec.blocked_reason = None;
1170 exec.updated_at = timestamp;
1171 exec.retry_mode = *retry_mode;
1172 if *reset_count {
1173 exec.attempt_count = 0;
1174 }
1175 flow.updated_at = timestamp;
1176 break;
1177 }
1178 }
1179 }
1180 }
1181
1182 EventPayload::TaskAborted { task_id, reason: _ } => {
1183 let flow_id = event.metadata.correlation.flow_id;
1184 let mut candidate_flow_ids = Vec::new();
1185
1186 if let Some(fid) = flow_id {
1187 candidate_flow_ids.push(fid);
1188 } else {
1189 for (fid, flow) in &self.flows {
1190 if flow.task_executions.contains_key(task_id) {
1191 candidate_flow_ids.push(*fid);
1192 }
1193 }
1194 }
1195
1196 for fid in candidate_flow_ids {
1197 if let Some(flow) = self.flows.get_mut(&fid) {
1198 if let Some(exec) = flow.task_executions.get_mut(task_id) {
1199 exec.state = TaskExecState::Failed;
1200 exec.blocked_reason = None;
1201 exec.updated_at = timestamp;
1202 flow.updated_at = timestamp;
1203 break;
1204 }
1205 }
1206 }
1207 }
1208
1209 EventPayload::HumanOverride {
1210 task_id,
1211 override_type: _,
1212 decision,
1213 reason: _,
1214 user: _,
1215 } => {
1216 let flow_id = event.metadata.correlation.flow_id;
1217 let mut candidate_flow_ids = Vec::new();
1218
1219 if let Some(fid) = flow_id {
1220 candidate_flow_ids.push(fid);
1221 } else {
1222 for (fid, flow) in &self.flows {
1223 if flow.task_executions.contains_key(task_id) {
1224 candidate_flow_ids.push(*fid);
1225 }
1226 }
1227 }
1228
1229 let new_state = if decision == "pass" {
1230 TaskExecState::Success
1231 } else {
1232 TaskExecState::Failed
1233 };
1234
1235 for fid in candidate_flow_ids {
1236 if let Some(flow) = self.flows.get_mut(&fid) {
1237 if let Some(exec) = flow.task_executions.get_mut(task_id) {
1238 exec.state = new_state;
1239 exec.blocked_reason = None;
1240 exec.updated_at = timestamp;
1241 flow.updated_at = timestamp;
1242 break;
1243 }
1244 }
1245 }
1246 }
1247
1248 EventPayload::MergePrepared {
1249 flow_id,
1250 target_branch,
1251 conflicts,
1252 } => {
1253 self.merge_states.insert(
1254 *flow_id,
1255 MergeState {
1256 flow_id: *flow_id,
1257 status: MergeStatus::Prepared,
1258 target_branch: target_branch.clone(),
1259 conflicts: conflicts.clone(),
1260 commits: Vec::new(),
1261 updated_at: timestamp,
1262 },
1263 );
1264 }
1265 EventPayload::MergeApproved { flow_id, user: _ } => {
1266 if let Some(ms) = self.merge_states.get_mut(flow_id) {
1267 ms.status = MergeStatus::Approved;
1268 ms.updated_at = timestamp;
1269 }
1270 }
1271 EventPayload::MergeCompleted { flow_id, commits } => {
1272 if let Some(ms) = self.merge_states.get_mut(flow_id) {
1273 ms.status = MergeStatus::Completed;
1274 commits.clone_into(&mut ms.commits);
1275 ms.updated_at = timestamp;
1276 }
1277
1278 if let Some(flow) = self.flows.get_mut(flow_id) {
1279 flow.state = FlowState::Merged;
1280 flow.completed_at = Some(timestamp);
1281 flow.updated_at = timestamp;
1282 }
1283 }
1284 }
1285 }
1286
1287 #[must_use]
1290 pub fn replay(events: &[Event]) -> Self {
1291 let mut state = Self::new();
1292 for event in events {
1293 state.apply_mut(event);
1294 }
1295 state
1296 }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302 use crate::core::events::CorrelationIds;
1303
1304 #[test]
1305 fn replay_is_deterministic() {
1306 let project_id = Uuid::new_v4();
1307 let events = vec![
1308 Event::new(
1309 EventPayload::ProjectCreated {
1310 id: project_id,
1311 name: "test".to_string(),
1312 description: None,
1313 },
1314 CorrelationIds::for_project(project_id),
1315 ),
1316 Event::new(
1317 EventPayload::ProjectUpdated {
1318 id: project_id,
1319 name: Some("updated".to_string()),
1320 description: None,
1321 },
1322 CorrelationIds::for_project(project_id),
1323 ),
1324 ];
1325
1326 let state1 = AppState::replay(&events);
1327 let state2 = AppState::replay(&events);
1328
1329 assert_eq!(state1.projects.len(), state2.projects.len());
1330 assert_eq!(
1331 state1.projects.get(&project_id).unwrap().name,
1332 state2.projects.get(&project_id).unwrap().name
1333 );
1334 }
1335
1336 #[test]
1337 fn replay_is_idempotent() {
1338 let project_id = Uuid::new_v4();
1339 let events = vec![Event::new(
1340 EventPayload::ProjectCreated {
1341 id: project_id,
1342 name: "test".to_string(),
1343 description: None,
1344 },
1345 CorrelationIds::for_project(project_id),
1346 )];
1347
1348 let state1 = AppState::replay(&events);
1349 let state2 = AppState::replay(&events);
1350
1351 assert_eq!(state1.projects.len(), 1);
1352 assert_eq!(state2.projects.len(), 1);
1353 }
1354
1355 #[test]
1356 fn task_lifecycle() {
1357 let project_id = Uuid::new_v4();
1358 let task_id = Uuid::new_v4();
1359
1360 let events = vec![
1361 Event::new(
1362 EventPayload::ProjectCreated {
1363 id: project_id,
1364 name: "proj".to_string(),
1365 description: None,
1366 },
1367 CorrelationIds::for_project(project_id),
1368 ),
1369 Event::new(
1370 EventPayload::TaskCreated {
1371 id: task_id,
1372 project_id,
1373 title: "task1".to_string(),
1374 description: None,
1375 scope: None,
1376 },
1377 CorrelationIds::for_task(project_id, task_id),
1378 ),
1379 Event::new(
1380 EventPayload::TaskClosed {
1381 id: task_id,
1382 reason: None,
1383 },
1384 CorrelationIds::for_task(project_id, task_id),
1385 ),
1386 ];
1387
1388 let state = AppState::replay(&events);
1389 let task = state.tasks.get(&task_id).unwrap();
1390
1391 assert_eq!(task.state, TaskState::Closed);
1392 }
1393}