1use std::collections::HashSet;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use futures::future::join_all;
6use futures::lock::Mutex;
7
8use crate::state::{MergeStrategy, MutationBatch, Snapshot, StateCommand, StateStore};
9use awaken_contract::StateError;
10use awaken_contract::model::{
11 FailedScheduledAction, FailedScheduledActionUpdate, FailedScheduledActions,
12 PendingScheduledActions, Phase, ScheduledActionEnvelope, ScheduledActionQueueUpdate,
13 TypedEffect,
14};
15
16use super::PhaseContext;
17use super::env::{ExecutionEnv, TaggedPhaseHook};
18use super::queue_plugin::RuntimeQueuePlugin;
19use super::reports::{
20 DEFAULT_MAX_PHASE_ROUNDS, EffectDispatchReport, PhaseRunReport, SubmitCommandReport,
21};
22
23#[derive(Clone)]
24pub struct PhaseRuntime {
25 store: StateStore,
26 execution_lock: Arc<Mutex<()>>,
27 next_id: Arc<AtomicU64>,
28}
29
30impl PhaseRuntime {
31 pub fn new(store: StateStore) -> Result<Self, StateError> {
32 match store.install_plugin(RuntimeQueuePlugin) {
33 Ok(()) => {}
34 Err(StateError::PluginAlreadyInstalled { .. }) => {}
35 Err(err) => return Err(err),
36 }
37
38 Ok(Self {
39 store,
40 execution_lock: Arc::new(Mutex::new(())),
41 next_id: Arc::new(AtomicU64::new(1)),
42 })
43 }
44
45 pub fn store(&self) -> &StateStore {
46 &self.store
47 }
48
49 pub async fn submit_command(
50 &self,
51 env: &ExecutionEnv,
52 command: StateCommand,
53 ) -> Result<SubmitCommandReport, StateError> {
54 let _guard = self.execution_lock.lock().await;
55 self.submit_command_inner(env, command).await
56 }
57
58 pub async fn run_phase(
59 &self,
60 env: &ExecutionEnv,
61 phase: Phase,
62 ) -> Result<PhaseRunReport, StateError> {
63 self.run_phase_with_limit(env, phase, DEFAULT_MAX_PHASE_ROUNDS)
64 .await
65 }
66
67 pub async fn run_phase_with_context(
68 &self,
69 env: &ExecutionEnv,
70 ctx: PhaseContext,
71 ) -> Result<PhaseRunReport, StateError> {
72 self.run_phase_ctx_inner(env, ctx, DEFAULT_MAX_PHASE_ROUNDS)
73 .await
74 }
75
76 pub async fn collect_commands(
78 &self,
79 env: &ExecutionEnv,
80 ctx: PhaseContext,
81 ) -> Result<StateCommand, StateError> {
82 self.run_hooks_collect(env, ctx).await
83 }
84
85 pub(crate) async fn run_execute_loop(
91 &self,
92 env: &ExecutionEnv,
93 ctx: PhaseContext,
94 ) -> Result<PhaseRunReport, StateError> {
95 self.run_execute_loop_inner(env, ctx, DEFAULT_MAX_PHASE_ROUNDS)
96 .await
97 }
98
99 pub async fn run_phase_with_limit(
100 &self,
101 env: &ExecutionEnv,
102 phase: Phase,
103 max_rounds: usize,
104 ) -> Result<PhaseRunReport, StateError> {
105 let ctx = PhaseContext::new(phase, self.store.snapshot());
106 self.run_phase_ctx_inner(env, ctx, max_rounds).await
107 }
108
109 async fn run_execute_loop_inner(
112 &self,
113 env: &ExecutionEnv,
114 base_ctx: PhaseContext,
115 max_rounds: usize,
116 ) -> Result<PhaseRunReport, StateError> {
117 let _guard = self.execution_lock.lock().await;
118 self.execute_scheduled_actions(env, &base_ctx, max_rounds)
119 .await
120 }
121
122 async fn run_phase_ctx_inner(
123 &self,
124 env: &ExecutionEnv,
125 base_ctx: PhaseContext,
126 max_rounds: usize,
127 ) -> Result<PhaseRunReport, StateError> {
128 if let Some(token) = base_ctx.cancellation_token.as_ref()
130 && token.is_cancelled()
131 {
132 return Err(StateError::Cancelled);
133 }
134
135 let _guard = self.execution_lock.lock().await;
136
137 let (hook_effects, hook_effect_report) =
138 self.gather_and_commit_hooks(env, &base_ctx).await?;
139
140 if let Some(token) = base_ctx.cancellation_token.as_ref()
142 && token.is_cancelled()
143 {
144 return Err(StateError::Cancelled);
145 }
146
147 let mut report = self
148 .execute_scheduled_actions(env, &base_ctx, max_rounds)
149 .await?;
150
151 report.generated_effects += hook_effects;
152 report.effect_report.attempted += hook_effect_report.attempted;
153 report.effect_report.dispatched += hook_effect_report.dispatched;
154 report.effect_report.failed += hook_effect_report.failed;
155
156 Ok(report)
157 }
158
159 async fn execute_scheduled_actions(
162 &self,
163 env: &ExecutionEnv,
164 base_ctx: &PhaseContext,
165 max_rounds: usize,
166 ) -> Result<PhaseRunReport, StateError> {
167 let phase = base_ctx.phase;
168 let mut total_processed = 0;
169 let mut total_skipped = 0;
170 let mut total_failed = 0;
171 let mut total_effects = 0;
172 let mut effect_report = EffectDispatchReport {
173 attempted: 0,
174 dispatched: 0,
175 failed: 0,
176 };
177 let mut rounds = 0;
178
179 loop {
180 rounds += 1;
181 if rounds > max_rounds {
182 return Err(StateError::PhaseRunLoopExceeded { phase, max_rounds });
183 }
184
185 let queued = self
186 .store
187 .read::<PendingScheduledActions>()
188 .unwrap_or_default();
189
190 let matching: Vec<_> = queued
191 .into_iter()
192 .filter(|envelope| {
193 envelope.action.phase == phase
194 && env
195 .scheduled_action_handlers
196 .contains_key(&envelope.action.key)
197 })
198 .collect();
199
200 tracing::debug!(phase = ?phase, actions = matching.len(), "execute_scheduled_actions");
201
202 if matching.is_empty() {
203 if rounds == 1 {
204 total_skipped = self
205 .store
206 .read::<PendingScheduledActions>()
207 .unwrap_or_default()
208 .iter()
209 .filter(|envelope| envelope.action.phase != phase)
210 .count();
211 }
212 break;
213 }
214
215 for envelope in matching {
216 let handler = env
217 .scheduled_action_handlers
218 .get(&envelope.action.key)
219 .cloned()
220 .expect("handler existence verified in filter above");
221
222 let ctx = base_ctx.clone().with_snapshot(self.store.snapshot());
223 let mut command = match handler
224 .handle_erased(&ctx, envelope.action.payload.clone())
225 .await
226 {
227 Ok(command) => command,
228 Err(err) => {
229 self.dead_letter(envelope, err.to_string())?;
230 total_failed += 1;
231 continue;
232 }
233 };
234 total_effects += command.effects.len();
235 command.patch.update::<PendingScheduledActions>(
236 ScheduledActionQueueUpdate::Remove { id: envelope.id },
237 );
238 match self.submit_command_inner(env, command).await {
239 Ok(report) => {
240 total_processed += 1;
241 effect_report.attempted += report.effect_report.attempted;
242 effect_report.dispatched += report.effect_report.dispatched;
243 effect_report.failed += report.effect_report.failed;
244 }
245 Err(err) => {
246 self.dead_letter(
247 envelope,
248 format!("failed to submit action command: {err}"),
249 )?;
250 total_failed += 1;
251 }
252 }
253 }
254 }
255
256 Ok(PhaseRunReport {
257 phase,
258 rounds,
259 processed_scheduled_actions: total_processed,
260 skipped_scheduled_actions: total_skipped,
261 failed_scheduled_actions: total_failed,
262 generated_effects: total_effects,
263 effect_report,
264 })
265 }
266
267 async fn submit_command_inner(
268 &self,
269 env: &ExecutionEnv,
270 mut command: StateCommand,
271 ) -> Result<SubmitCommandReport, StateError> {
272 for action in &command.scheduled_actions {
274 if !env.scheduled_action_handlers.contains_key(&action.key) {
275 return Err(StateError::UnknownScheduledActionHandler {
276 key: action.key.clone(),
277 });
278 }
279 }
280 for effect in &command.effects {
282 if !env.effect_handlers.contains_key(&effect.key) {
283 return Err(StateError::UnknownEffectHandler {
284 key: effect.key.clone(),
285 });
286 }
287 }
288
289 for action in command.scheduled_actions.drain(..) {
290 let entry = ScheduledActionEnvelope {
291 id: self.next_id.fetch_add(1, Ordering::SeqCst),
292 action,
293 };
294 tracing::debug!(
295 id = entry.id,
296 phase = ?entry.action.phase,
297 key = %entry.action.key,
298 "scheduled action enqueued"
299 );
300 command
301 .patch
302 .update::<PendingScheduledActions>(ScheduledActionQueueUpdate::Push(entry));
303 }
304
305 let mut effects = Vec::new();
306 for effect in command.effects.drain(..) {
307 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
308 tracing::debug!(id, key = %effect.key, "effect dispatching");
309 effects.push(effect);
310 }
311
312 let revision = self.store.commit(command.patch)?;
313 let snapshot = self.store.snapshot();
314 let effect_report = self.dispatch_effects(env, &effects, &snapshot).await;
315 Ok(SubmitCommandReport {
316 revision,
317 effect_report,
318 })
319 }
320
321 async fn dispatch_effects(
322 &self,
323 env: &ExecutionEnv,
324 effects: &[TypedEffect],
325 snapshot: &Snapshot,
326 ) -> EffectDispatchReport {
327 let mut report = EffectDispatchReport {
328 attempted: 0,
329 dispatched: 0,
330 failed: 0,
331 };
332
333 for effect in effects {
334 report.attempted += 1;
335 let Some(handler) = env.effect_handlers.get(&effect.key) else {
336 report.failed += 1;
337 continue;
338 };
339
340 match handler
341 .handle_erased(effect.payload.clone(), snapshot)
342 .await
343 {
344 Ok(()) => report.dispatched += 1,
345 Err(_) => report.failed += 1,
346 }
347 }
348
349 report
350 }
351
352 async fn run_hooks_collect(
355 &self,
356 env: &ExecutionEnv,
357 ctx: PhaseContext,
358 ) -> Result<StateCommand, StateError> {
359 let snapshot = self.store.snapshot();
360 let hooks = Self::filter_hooks(env, &ctx);
361 let indexed = Self::run_hooks_indexed(&hooks, &ctx, &snapshot).await?;
362 let commands = indexed.into_iter().map(|(_, cmd)| cmd).collect();
363 self.store.merge_all_commands(commands)
364 }
365
366 async fn gather_and_commit_hooks(
377 &self,
378 env: &ExecutionEnv,
379 base_ctx: &PhaseContext,
380 ) -> Result<(usize, EffectDispatchReport), StateError> {
381 let hooks = Self::filter_hooks(env, base_ctx);
382 if hooks.is_empty() {
383 return Ok((
384 0,
385 EffectDispatchReport {
386 attempted: 0,
387 dispatched: 0,
388 failed: 0,
389 },
390 ));
391 }
392
393 tracing::debug!(phase = ?base_ctx.phase, hooks = hooks.len(), "gather_start");
394
395 let snapshot = self.store.snapshot();
396 let indexed = Self::run_hooks_indexed(&hooks, base_ctx, &snapshot).await?;
397
398 if indexed.is_empty() {
399 return Ok((
400 0,
401 EffectDispatchReport {
402 attempted: 0,
403 dispatched: 0,
404 failed: 0,
405 },
406 ));
407 }
408
409 let has_conflicts = {
411 let registry = self.store.registry.lock();
412 has_exclusive_key_overlap(&indexed, |k| registry.merge_strategy(k))
413 };
414
415 let mut total_effects = 0;
416 let mut effect_report = EffectDispatchReport {
417 attempted: 0,
418 dispatched: 0,
419 failed: 0,
420 };
421
422 if !has_conflicts {
423 let commands = indexed.into_iter().map(|(_, cmd)| cmd).collect();
424 let merged = self.store.merge_all_commands(commands)?;
425 if !merged.is_empty() {
426 total_effects += merged.effects.len();
427 let r = self.submit_command_inner(env, merged).await?;
428 effect_report.attempted += r.effect_report.attempted;
429 effect_report.dispatched += r.effect_report.dispatched;
430 effect_report.failed += r.effect_report.failed;
431 }
432 return Ok((total_effects, effect_report));
433 }
434
435 tracing::warn!(phase = ?base_ctx.phase, "exclusive_conflict_fallback");
437 let (batch_commands, deferred_indices) = {
438 let registry = self.store.registry.lock();
439 partition_commands(indexed, |k| registry.merge_strategy(k))
440 };
441
442 if !batch_commands.is_empty() {
444 let merged = self.store.merge_all_commands(batch_commands)?;
445 if !merged.is_empty() {
446 total_effects += merged.effects.len();
447 let r = self.submit_command_inner(env, merged).await?;
448 effect_report.attempted += r.effect_report.attempted;
449 effect_report.dispatched += r.effect_report.dispatched;
450 effect_report.failed += r.effect_report.failed;
451 }
452 }
453
454 for hook_idx in deferred_indices {
456 let snap = self.store.snapshot();
457 let ctx = base_ctx.clone().with_snapshot(snap.clone());
458 let mut cmd = hooks[hook_idx].hook.run(&ctx).await?;
459 if cmd.base_revision().is_none() {
460 cmd = cmd.with_base_revision(snap.revision());
461 }
462 if !cmd.is_empty() {
463 total_effects += cmd.effects.len();
464 let r = self.submit_command_inner(env, cmd).await?;
465 effect_report.attempted += r.effect_report.attempted;
466 effect_report.dispatched += r.effect_report.dispatched;
467 effect_report.failed += r.effect_report.failed;
468 }
469 }
470
471 Ok((total_effects, effect_report))
472 }
473
474 fn filter_hooks<'a>(env: &'a ExecutionEnv, ctx: &PhaseContext) -> Vec<&'a TaggedPhaseHook> {
475 let hooks = env.hooks_for_phase(ctx.phase);
476 let active_hook_filter = &ctx.agent_spec.active_hook_filter;
477 hooks
478 .iter()
479 .filter(|tagged| {
480 active_hook_filter.is_empty() || active_hook_filter.contains(&tagged.plugin_id)
481 })
482 .collect()
483 }
484
485 async fn run_hooks_indexed(
487 hooks: &[&TaggedPhaseHook],
488 base_ctx: &PhaseContext,
489 snapshot: &Snapshot,
490 ) -> Result<Vec<(usize, StateCommand)>, StateError> {
491 let results = join_all(hooks.iter().enumerate().map(|(i, tagged)| {
492 let hook = tagged.hook.clone();
493 let hook_snapshot = snapshot.clone();
494 let hook_ctx = base_ctx.clone().with_snapshot(hook_snapshot.clone());
495 async move {
496 let mut cmd = hook.run(&hook_ctx).await?;
497 if cmd.base_revision().is_none() {
498 cmd = cmd.with_base_revision(hook_snapshot.revision());
499 }
500 Ok::<(usize, StateCommand), StateError>((i, cmd))
501 }
502 }))
503 .await;
504
505 let mut indexed = Vec::new();
506 for result in results {
507 let (i, cmd) = result?;
508 if !cmd.is_empty() {
509 indexed.push((i, cmd));
510 }
511 }
512 Ok(indexed)
513 }
514
515 fn dead_letter(
516 &self,
517 envelope: ScheduledActionEnvelope,
518 error: String,
519 ) -> Result<(), StateError> {
520 let mut patch = MutationBatch::new();
521 patch.update::<PendingScheduledActions>(ScheduledActionQueueUpdate::Remove {
522 id: envelope.id,
523 });
524 patch.update::<FailedScheduledActions>(FailedScheduledActionUpdate::Push(
525 FailedScheduledAction {
526 id: envelope.id,
527 action: envelope.action,
528 error,
529 },
530 ));
531 self.store.commit(patch).map(|_| ())
532 }
533}
534
535fn has_exclusive_key_overlap(
537 commands: &[(usize, StateCommand)],
538 strategy: impl Fn(&str) -> MergeStrategy,
539) -> bool {
540 let mut seen: HashSet<&str> = HashSet::new();
541 for (_, cmd) in commands {
542 for key in &cmd.patch.touched_keys {
543 if strategy(key) == MergeStrategy::Exclusive && !seen.insert(key.as_str()) {
544 return true;
545 }
546 }
547 }
548 false
549}
550
551fn partition_commands(
557 commands: Vec<(usize, StateCommand)>,
558 strategy: impl Fn(&str) -> MergeStrategy,
559) -> (Vec<StateCommand>, Vec<usize>) {
560 let mut batch_exclusive_keys: HashSet<String> = HashSet::new();
561 let mut batch = Vec::new();
562 let mut deferred = Vec::new();
563
564 for (hook_idx, cmd) in commands {
565 let conflicts = cmd.patch.touched_keys.iter().any(|k| {
566 strategy(k) == MergeStrategy::Exclusive && batch_exclusive_keys.contains(k.as_str())
567 });
568
569 if conflicts {
570 deferred.push(hook_idx);
571 } else {
572 for k in &cmd.patch.touched_keys {
573 if strategy(k) == MergeStrategy::Exclusive {
574 batch_exclusive_keys.insert(k.clone());
575 }
576 }
577 batch.push(cmd);
578 }
579 }
580
581 (batch, deferred)
582}