Skip to main content

awaken_runtime/phase/
engine.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use futures::future::join_all;
6use futures::lock::Mutex;
7
8use crate::state::{MergeStrategy, MutationBatch, Snapshot, StateCommand, StateStore};
9use awaken_contract::StateError;
10use awaken_contract::model::{
11    FailedScheduledAction, FailedScheduledActionUpdate, FailedScheduledActions,
12    PendingScheduledActions, Phase, ScheduledActionEnvelope, ScheduledActionQueueUpdate,
13    TypedEffect,
14};
15
16use super::PhaseContext;
17use super::env::{ExecutionEnv, TaggedPhaseHook};
18use super::queue_plugin::RuntimeQueuePlugin;
19use super::reports::{
20    DEFAULT_MAX_PHASE_ROUNDS, EffectDispatchReport, PhaseRunReport, SubmitCommandReport,
21};
22
23#[derive(Clone)]
24pub struct PhaseRuntime {
25    store: StateStore,
26    execution_lock: Arc<Mutex<()>>,
27    next_id: Arc<AtomicU64>,
28}
29
30impl PhaseRuntime {
31    pub fn new(store: StateStore) -> Result<Self, StateError> {
32        match store.install_plugin(RuntimeQueuePlugin) {
33            Ok(()) => {}
34            Err(StateError::PluginAlreadyInstalled { .. }) => {}
35            Err(err) => return Err(err),
36        }
37
38        Ok(Self {
39            store,
40            execution_lock: Arc::new(Mutex::new(())),
41            next_id: Arc::new(AtomicU64::new(1)),
42        })
43    }
44
45    pub fn store(&self) -> &StateStore {
46        &self.store
47    }
48
49    pub async fn submit_command(
50        &self,
51        env: &ExecutionEnv,
52        command: StateCommand,
53    ) -> Result<SubmitCommandReport, StateError> {
54        let _guard = self.execution_lock.lock().await;
55        self.submit_command_inner(env, command).await
56    }
57
58    pub async fn run_phase(
59        &self,
60        env: &ExecutionEnv,
61        phase: Phase,
62    ) -> Result<PhaseRunReport, StateError> {
63        self.run_phase_with_limit(env, phase, DEFAULT_MAX_PHASE_ROUNDS)
64            .await
65    }
66
67    pub async fn run_phase_with_context(
68        &self,
69        env: &ExecutionEnv,
70        ctx: PhaseContext,
71    ) -> Result<PhaseRunReport, StateError> {
72        self.run_phase_ctx_inner(env, ctx, DEFAULT_MAX_PHASE_ROUNDS)
73            .await
74    }
75
76    /// Run phase hooks without committing — return the combined StateCommand.
77    pub async fn collect_commands(
78        &self,
79        env: &ExecutionEnv,
80        ctx: PhaseContext,
81    ) -> Result<StateCommand, StateError> {
82        self.run_hooks_collect(env, ctx).await
83    }
84
85    /// Run only the EXECUTE stage of a phase (no GATHER/hook execution).
86    ///
87    /// Processes pending scheduled actions that match the given phase and have
88    /// a registered handler. Used when GATHER was already done via `collect_commands`
89    /// and the caller has manually submitted the remaining command.
90    pub(crate) async fn run_execute_loop(
91        &self,
92        env: &ExecutionEnv,
93        ctx: PhaseContext,
94    ) -> Result<PhaseRunReport, StateError> {
95        self.run_execute_loop_inner(env, ctx, DEFAULT_MAX_PHASE_ROUNDS)
96            .await
97    }
98
99    pub async fn run_phase_with_limit(
100        &self,
101        env: &ExecutionEnv,
102        phase: Phase,
103        max_rounds: usize,
104    ) -> Result<PhaseRunReport, StateError> {
105        let ctx = PhaseContext::new(phase, self.store.snapshot());
106        self.run_phase_ctx_inner(env, ctx, max_rounds).await
107    }
108
109    /// EXECUTE-only inner: same convergence loop as `run_phase_ctx_inner` but
110    /// without the GATHER (hook execution) stage.
111    async fn run_execute_loop_inner(
112        &self,
113        env: &ExecutionEnv,
114        base_ctx: PhaseContext,
115        max_rounds: usize,
116    ) -> Result<PhaseRunReport, StateError> {
117        let _guard = self.execution_lock.lock().await;
118        self.execute_scheduled_actions(env, &base_ctx, max_rounds)
119            .await
120    }
121
122    async fn run_phase_ctx_inner(
123        &self,
124        env: &ExecutionEnv,
125        base_ctx: PhaseContext,
126        max_rounds: usize,
127    ) -> Result<PhaseRunReport, StateError> {
128        // Check cancellation at phase entry
129        if let Some(token) = base_ctx.cancellation_token.as_ref()
130            && token.is_cancelled()
131        {
132            return Err(StateError::Cancelled);
133        }
134
135        let _guard = self.execution_lock.lock().await;
136
137        let (hook_effects, hook_effect_report) =
138            self.gather_and_commit_hooks(env, &base_ctx).await?;
139
140        // Check cancellation after hooks, before scheduled action execution
141        if let Some(token) = base_ctx.cancellation_token.as_ref()
142            && token.is_cancelled()
143        {
144            return Err(StateError::Cancelled);
145        }
146
147        let mut report = self
148            .execute_scheduled_actions(env, &base_ctx, max_rounds)
149            .await?;
150
151        report.generated_effects += hook_effects;
152        report.effect_report.attempted += hook_effect_report.attempted;
153        report.effect_report.dispatched += hook_effect_report.dispatched;
154        report.effect_report.failed += hook_effect_report.failed;
155
156        Ok(report)
157    }
158
159    /// Convergence loop that processes pending scheduled actions matching the
160    /// phase until no more remain. Callers must hold the execution lock.
161    async fn execute_scheduled_actions(
162        &self,
163        env: &ExecutionEnv,
164        base_ctx: &PhaseContext,
165        max_rounds: usize,
166    ) -> Result<PhaseRunReport, StateError> {
167        let phase = base_ctx.phase;
168        let mut total_processed = 0;
169        let mut total_skipped = 0;
170        let mut total_failed = 0;
171        let mut total_effects = 0;
172        let mut effect_report = EffectDispatchReport {
173            attempted: 0,
174            dispatched: 0,
175            failed: 0,
176        };
177        let mut rounds = 0;
178
179        loop {
180            rounds += 1;
181            if rounds > max_rounds {
182                return Err(StateError::PhaseRunLoopExceeded { phase, max_rounds });
183            }
184
185            let queued = self
186                .store
187                .read::<PendingScheduledActions>()
188                .unwrap_or_default();
189
190            let matching: Vec<_> = queued
191                .into_iter()
192                .filter(|envelope| {
193                    envelope.action.phase == phase
194                        && env
195                            .scheduled_action_handlers
196                            .contains_key(&envelope.action.key)
197                })
198                .collect();
199
200            tracing::debug!(phase = ?phase, actions = matching.len(), "execute_scheduled_actions");
201
202            if matching.is_empty() {
203                if rounds == 1 {
204                    total_skipped = self
205                        .store
206                        .read::<PendingScheduledActions>()
207                        .unwrap_or_default()
208                        .iter()
209                        .filter(|envelope| envelope.action.phase != phase)
210                        .count();
211                }
212                break;
213            }
214
215            for envelope in matching {
216                let handler = env
217                    .scheduled_action_handlers
218                    .get(&envelope.action.key)
219                    .cloned()
220                    .expect("handler existence verified in filter above");
221
222                let ctx = base_ctx.clone().with_snapshot(self.store.snapshot());
223                let mut command = match handler
224                    .handle_erased(&ctx, envelope.action.payload.clone())
225                    .await
226                {
227                    Ok(command) => command,
228                    Err(err) => {
229                        self.dead_letter(envelope, err.to_string())?;
230                        total_failed += 1;
231                        continue;
232                    }
233                };
234                total_effects += command.effects.len();
235                command.patch.update::<PendingScheduledActions>(
236                    ScheduledActionQueueUpdate::Remove { id: envelope.id },
237                );
238                match self.submit_command_inner(env, command).await {
239                    Ok(report) => {
240                        total_processed += 1;
241                        effect_report.attempted += report.effect_report.attempted;
242                        effect_report.dispatched += report.effect_report.dispatched;
243                        effect_report.failed += report.effect_report.failed;
244                    }
245                    Err(err) => {
246                        self.dead_letter(
247                            envelope,
248                            format!("failed to submit action command: {err}"),
249                        )?;
250                        total_failed += 1;
251                    }
252                }
253            }
254        }
255
256        Ok(PhaseRunReport {
257            phase,
258            rounds,
259            processed_scheduled_actions: total_processed,
260            skipped_scheduled_actions: total_skipped,
261            failed_scheduled_actions: total_failed,
262            generated_effects: total_effects,
263            effect_report,
264        })
265    }
266
267    async fn submit_command_inner(
268        &self,
269        env: &ExecutionEnv,
270        mut command: StateCommand,
271    ) -> Result<SubmitCommandReport, StateError> {
272        // Validate all action keys have a registered handler.
273        for action in &command.scheduled_actions {
274            if !env.scheduled_action_handlers.contains_key(&action.key) {
275                return Err(StateError::UnknownScheduledActionHandler {
276                    key: action.key.clone(),
277                });
278            }
279        }
280        // Validate effect keys have registered handlers.
281        for effect in &command.effects {
282            if !env.effect_handlers.contains_key(&effect.key) {
283                return Err(StateError::UnknownEffectHandler {
284                    key: effect.key.clone(),
285                });
286            }
287        }
288
289        for action in command.scheduled_actions.drain(..) {
290            let entry = ScheduledActionEnvelope {
291                id: self.next_id.fetch_add(1, Ordering::SeqCst),
292                action,
293            };
294            tracing::debug!(
295                id = entry.id,
296                phase = ?entry.action.phase,
297                key = %entry.action.key,
298                "scheduled action enqueued"
299            );
300            command
301                .patch
302                .update::<PendingScheduledActions>(ScheduledActionQueueUpdate::Push(entry));
303        }
304
305        let mut effects = Vec::new();
306        for effect in command.effects.drain(..) {
307            let id = self.next_id.fetch_add(1, Ordering::SeqCst);
308            tracing::debug!(id, key = %effect.key, "effect dispatching");
309            effects.push(effect);
310        }
311
312        let revision = self.store.commit(command.patch)?;
313        let snapshot = self.store.snapshot();
314        let effect_report = self.dispatch_effects(env, &effects, &snapshot).await;
315        Ok(SubmitCommandReport {
316            revision,
317            effect_report,
318        })
319    }
320
321    async fn dispatch_effects(
322        &self,
323        env: &ExecutionEnv,
324        effects: &[TypedEffect],
325        snapshot: &Snapshot,
326    ) -> EffectDispatchReport {
327        let mut report = EffectDispatchReport {
328            attempted: 0,
329            dispatched: 0,
330            failed: 0,
331        };
332
333        for effect in effects {
334            report.attempted += 1;
335            let Some(handler) = env.effect_handlers.get(&effect.key) else {
336                report.failed += 1;
337                continue;
338            };
339
340            match handler
341                .handle_erased(effect.payload.clone(), snapshot)
342                .await
343            {
344                Ok(()) => report.dispatched += 1,
345                Err(_) => report.failed += 1,
346            }
347        }
348
349        report
350    }
351
352    /// Run phase hooks, collecting their commands without committing.
353    /// Returns a single merged command; fails on Exclusive conflicts (no auto-fallback).
354    async fn run_hooks_collect(
355        &self,
356        env: &ExecutionEnv,
357        ctx: PhaseContext,
358    ) -> Result<StateCommand, StateError> {
359        let snapshot = self.store.snapshot();
360        let hooks = Self::filter_hooks(env, &ctx);
361        let indexed = Self::run_hooks_indexed(&hooks, &ctx, &snapshot).await?;
362        let commands = indexed.into_iter().map(|(_, cmd)| cmd).collect();
363        self.store.merge_all_commands(commands)
364    }
365
366    /// Run phase hooks with Exclusive conflict auto-fallback.
367    ///
368    /// Hooks are pure functions (frozen snapshot in, `StateCommand` out, no side effects),
369    /// so re-execution on conflict is always safe.
370    ///
371    /// Algorithm:
372    /// 1. Run all hooks in parallel against a frozen snapshot.
373    /// 2. If no Exclusive key overlaps, merge all and commit once.
374    /// 3. On conflict: greedily partition into a compatible batch + deferred set.
375    ///    Commit the batch, then re-run deferred hooks serially against fresh snapshots.
376    async fn gather_and_commit_hooks(
377        &self,
378        env: &ExecutionEnv,
379        base_ctx: &PhaseContext,
380    ) -> Result<(usize, EffectDispatchReport), StateError> {
381        let hooks = Self::filter_hooks(env, base_ctx);
382        if hooks.is_empty() {
383            return Ok((
384                0,
385                EffectDispatchReport {
386                    attempted: 0,
387                    dispatched: 0,
388                    failed: 0,
389                },
390            ));
391        }
392
393        tracing::debug!(phase = ?base_ctx.phase, hooks = hooks.len(), "gather_start");
394
395        let snapshot = self.store.snapshot();
396        let indexed = Self::run_hooks_indexed(&hooks, base_ctx, &snapshot).await?;
397
398        if indexed.is_empty() {
399            return Ok((
400                0,
401                EffectDispatchReport {
402                    attempted: 0,
403                    dispatched: 0,
404                    failed: 0,
405                },
406            ));
407        }
408
409        // Fast path: no Exclusive key overlap → merge all, commit once
410        let has_conflicts = {
411            let registry = self.store.registry.lock();
412            has_exclusive_key_overlap(&indexed, |k| registry.merge_strategy(k))
413        };
414
415        let mut total_effects = 0;
416        let mut effect_report = EffectDispatchReport {
417            attempted: 0,
418            dispatched: 0,
419            failed: 0,
420        };
421
422        if !has_conflicts {
423            let commands = indexed.into_iter().map(|(_, cmd)| cmd).collect();
424            let merged = self.store.merge_all_commands(commands)?;
425            if !merged.is_empty() {
426                total_effects += merged.effects.len();
427                let r = self.submit_command_inner(env, merged).await?;
428                effect_report.attempted += r.effect_report.attempted;
429                effect_report.dispatched += r.effect_report.dispatched;
430                effect_report.failed += r.effect_report.failed;
431            }
432            return Ok((total_effects, effect_report));
433        }
434
435        // Conflict fallback: partition into compatible batch + deferred
436        tracing::warn!(phase = ?base_ctx.phase, "exclusive_conflict_fallback");
437        let (batch_commands, deferred_indices) = {
438            let registry = self.store.registry.lock();
439            partition_commands(indexed, |k| registry.merge_strategy(k))
440        };
441
442        // Commit the compatible batch
443        if !batch_commands.is_empty() {
444            let merged = self.store.merge_all_commands(batch_commands)?;
445            if !merged.is_empty() {
446                total_effects += merged.effects.len();
447                let r = self.submit_command_inner(env, merged).await?;
448                effect_report.attempted += r.effect_report.attempted;
449                effect_report.dispatched += r.effect_report.dispatched;
450                effect_report.failed += r.effect_report.failed;
451            }
452        }
453
454        // Re-run deferred hooks serially, each against a fresh snapshot
455        for hook_idx in deferred_indices {
456            let snap = self.store.snapshot();
457            let ctx = base_ctx.clone().with_snapshot(snap.clone());
458            let mut cmd = hooks[hook_idx].hook.run(&ctx).await?;
459            if cmd.base_revision().is_none() {
460                cmd = cmd.with_base_revision(snap.revision());
461            }
462            if !cmd.is_empty() {
463                total_effects += cmd.effects.len();
464                let r = self.submit_command_inner(env, cmd).await?;
465                effect_report.attempted += r.effect_report.attempted;
466                effect_report.dispatched += r.effect_report.dispatched;
467                effect_report.failed += r.effect_report.failed;
468            }
469        }
470
471        Ok((total_effects, effect_report))
472    }
473
474    fn filter_hooks<'a>(env: &'a ExecutionEnv, ctx: &PhaseContext) -> Vec<&'a TaggedPhaseHook> {
475        let hooks = env.hooks_for_phase(ctx.phase);
476        let active_hook_filter = &ctx.agent_spec.active_hook_filter;
477        hooks
478            .iter()
479            .filter(|tagged| {
480                active_hook_filter.is_empty() || active_hook_filter.contains(&tagged.plugin_id)
481            })
482            .collect()
483    }
484
485    /// Run hooks in parallel, returning (hook_index, command) pairs for non-empty results.
486    async fn run_hooks_indexed(
487        hooks: &[&TaggedPhaseHook],
488        base_ctx: &PhaseContext,
489        snapshot: &Snapshot,
490    ) -> Result<Vec<(usize, StateCommand)>, StateError> {
491        let results = join_all(hooks.iter().enumerate().map(|(i, tagged)| {
492            let hook = tagged.hook.clone();
493            let hook_snapshot = snapshot.clone();
494            let hook_ctx = base_ctx.clone().with_snapshot(hook_snapshot.clone());
495            async move {
496                let mut cmd = hook.run(&hook_ctx).await?;
497                if cmd.base_revision().is_none() {
498                    cmd = cmd.with_base_revision(hook_snapshot.revision());
499                }
500                Ok::<(usize, StateCommand), StateError>((i, cmd))
501            }
502        }))
503        .await;
504
505        let mut indexed = Vec::new();
506        for result in results {
507            let (i, cmd) = result?;
508            if !cmd.is_empty() {
509                indexed.push((i, cmd));
510            }
511        }
512        Ok(indexed)
513    }
514
515    fn dead_letter(
516        &self,
517        envelope: ScheduledActionEnvelope,
518        error: String,
519    ) -> Result<(), StateError> {
520        let mut patch = MutationBatch::new();
521        patch.update::<PendingScheduledActions>(ScheduledActionQueueUpdate::Remove {
522            id: envelope.id,
523        });
524        patch.update::<FailedScheduledActions>(FailedScheduledActionUpdate::Push(
525            FailedScheduledAction {
526                id: envelope.id,
527                action: envelope.action,
528                error,
529            },
530        ));
531        self.store.commit(patch).map(|_| ())
532    }
533}
534
535/// Check whether any Exclusive key appears in more than one command.
536fn has_exclusive_key_overlap(
537    commands: &[(usize, StateCommand)],
538    strategy: impl Fn(&str) -> MergeStrategy,
539) -> bool {
540    let mut seen: HashSet<&str> = HashSet::new();
541    for (_, cmd) in commands {
542        for key in &cmd.patch.touched_keys {
543            if strategy(key) == MergeStrategy::Exclusive && !seen.insert(key.as_str()) {
544                return true;
545            }
546        }
547    }
548    false
549}
550
551/// Greedily partition commands into a compatible batch and deferred hook indices.
552///
553/// Walks commands in registration order. A command is added to the batch if none
554/// of its Exclusive keys overlap with keys already in the batch; otherwise its
555/// hook index is deferred for serial re-execution.
556fn partition_commands(
557    commands: Vec<(usize, StateCommand)>,
558    strategy: impl Fn(&str) -> MergeStrategy,
559) -> (Vec<StateCommand>, Vec<usize>) {
560    let mut batch_exclusive_keys: HashSet<String> = HashSet::new();
561    let mut batch = Vec::new();
562    let mut deferred = Vec::new();
563
564    for (hook_idx, cmd) in commands {
565        let conflicts = cmd.patch.touched_keys.iter().any(|k| {
566            strategy(k) == MergeStrategy::Exclusive && batch_exclusive_keys.contains(k.as_str())
567        });
568
569        if conflicts {
570            deferred.push(hook_idx);
571        } else {
572            for k in &cmd.patch.touched_keys {
573                if strategy(k) == MergeStrategy::Exclusive {
574                    batch_exclusive_keys.insert(k.clone());
575                }
576            }
577            batch.push(cmd);
578        }
579    }
580
581    (batch, deferred)
582}