Skip to main content

bb_runtime/engine/
poll.rs

1//! 8-phase poll cycle + `handle_completion` per `docs/ENGINE.md`
2//! §7 + §9.
3use crate::engine::core::Engine;
4use crate::engine::step::EngineStep;
5use crate::framework::scheduler::TimerKind;
6use crate::ids::{CommandId, NodeSiteId};
7use crate::ingress::IngressEvent;
8use crate::slot_value::SlotValue;
9use crate::syscall::values::{BytesValue, WireReqIdValue};
10
11impl Engine {
12    /// Handle a CommandId completion per ENGINE.md §9.2.
13    /// Writes the values into the suspended Op's output sites +
14    /// pushes ready downstream consumers onto the frontier.
15    pub fn handle_completion(
16        &mut self,
17        cmd_id: CommandId,
18        values: Vec<(String, Box<dyn SlotValue>)>,
19    ) -> Vec<EngineStep> {
20        let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
21            // No matching suspension - completion arrived for a
22            // CommandId the engine doesn't know. Silently drop;
23            return Vec::new();
24        };
25
26        let mut steps = Vec::new();
27
28        // Move output_sites out of pending (pending is owned via
29        // pending_async.remove above). Helpers take &[NodeSiteId]
30        // borrows; the final step consumes by value.
31        let sites: Vec<NodeSiteId> = pending.output_sites;
32        for (i, (_name, value)) in values.into_iter().enumerate() {
33            if let Some(site) = sites.get(i).copied() {
34                self.exec
35                    .slot_table
36                    .insert((site, pending.exec_id), Some(value));
37            }
38        }
39
40        // Push ready downstream consumers.
41        self.push_ready_consumers(&sites, pending.exec_id);
42
43        // Function-call splice: async completion arriving inside a
44        // body's derived ExecId forwards to the caller's slots per
45        // ENGINE.md §8.4. No-op when there's no pending call context.
46        self.forward_outputs_to_caller(&sites, pending.exec_id);
47
48        // Surface top-level function outputs (no in-function consumer)
49        // as AppEvents - same path as `Engine::write_outputs`, so
50        // async-completion writes participate in the canonical
51        // function-signature → engine I/O contract.
52        self.surface_top_level_outputs(&sites, pending.exec_id);
53
54        steps.push(EngineStep::OpCompleted {
55            op_ref: pending.op_ref,
56            exec_id: pending.exec_id,
57            sites_written: sites,
58        });
59        steps
60    }
61
62    /// Handle a transport-reported failure for a suspended
63    /// `CommandId`. The Op that was waiting on `cmd_id` fails
64    /// through the existing `OpFailed` path (bus
65    /// `InfraEvent::OpFailure` + `EngineStep::OpFailed`). Use this
66    /// when the host's transport adapter learns that the remote
67    /// side failed to produce a result - the framework no longer
68    /// silently swallows the outcome.
69    pub fn handle_completion_failed(
70        &mut self,
71        cmd_id: CommandId,
72        error: crate::bus::OpError,
73    ) -> Vec<EngineStep> {
74        let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
75            // No matching suspension - failure arrived for a
76            // CommandId the engine doesn't know. Silently drop;
77            // the host's transport reconciliation should have
78            // caught this earlier.
79            return Vec::new();
80        };
81        vec![self.fail_op(
82            pending.op_ref,
83            pending.exec_id,
84            crate::bus::OpErrorKind::RemoteFailed,
85            "completion_failed",
86            error.detail,
87        )]
88    }
89
90    /// Expire any pending async suspensions whose `deadline_ns` is
91    /// past `scheduler.now_ns()`. Each expired suspension fails via
92    /// the existing `OpFailed` surface with
93    /// `OpError("deadline exceeded")`. Returns the resulting steps.
94    /// Called from Phase 5 of the poll cycle before draining
95    /// `pending_completions`, so deadline failures land in the
96    /// same poll where they expire.
97    fn expire_deadlines(&mut self) -> Vec<EngineStep> {
98        let now_ns = self.framework.scheduler.now_ns();
99        let expired: Vec<CommandId> = self
100            .exec
101            .pending_async
102            .iter()
103            .filter_map(|(cmd, p)| match p.deadline_ns {
104                Some(d) if d <= now_ns => Some(*cmd),
105                _ => None,
106            })
107            .collect();
108        let mut steps = Vec::new();
109        for cmd in expired {
110            if let Some(p) = self.exec.pending_async.remove(&cmd) {
111                steps.push(self.fail_op(
112                    p.op_ref,
113                    p.exec_id,
114                    crate::bus::OpErrorKind::Timeout,
115                    "deadline_exceeded",
116                    "deadline exceeded".to_string(),
117                ));
118            }
119        }
120
121        // Drain stale in-flight wire requests. Each evicted entry
122        // surfaces as `EngineStep::WireTimeout` for observability;
123        // if it carried a `parked_op`, fail the originator's local
124        // continuation with "chain timeout" so it doesn't sit
125        // parked forever.
126        let drained = self.framework.request_tracker.drain_stale(now_ns);
127        for (wire_req_id, entry) in drained {
128            steps.push(EngineStep::WireTimeout {
129                wire_req_id,
130                target_site: entry.target_site,
131                started_at_ns: entry.started_at_ns,
132                parked_op: entry.parked_op,
133            });
134            if let Some(cmd) = entry.parked_op {
135                if let Some(p) = self.exec.pending_async.remove(&cmd) {
136                    steps.push(self.fail_op(
137                        p.op_ref,
138                        p.exec_id,
139                        crate::bus::OpErrorKind::Timeout,
140                        "chain_timeout",
141                        "chain timeout".to_string(),
142                    ));
143                }
144            }
145        }
146        steps
147    }
148
149    /// 8-phase poll cycle per ENGINE.md §7.
150    pub fn poll(&mut self) -> Vec<EngineStep> {
151        let _poll_span = tracing::debug_span!("engine.poll").entered();
152        // GC executions that finished in the previous cycle. The
153        // one-cycle delay lets the host read completion state via
154        // `slot_at` between polls; production consumers read via the
155        // `EngineStep` stream so the delay is invisible to them.
156        self.gc_completed_executions();
157        let mut steps = Vec::new();
158        // Per-poll counter used by `cycle_op_budget` enforcement.
159        // Increments once per `invoke_one` call across Phases 2, 6,
160        // and 7. When the budget is hit, the current drain loop
161        // breaks and `CycleBudgetExceeded` is appended once.
162        let mut ops_invoked: usize = 0;
163        let mut budget_exceeded = false;
164
165        // Drive any host-supplied bootstrap requests staged between
166        // polls. The host kicks the install-order queue via
167        // `Node::run_bootstrap` (which seeds before this poll runs)
168        // and stages targets-with-inputs via `enqueue_bootstrap_request`;
169        // both paths land here so disjoint targets fire alongside the
170        // already-seeded phase and overlapping ones park on
171        // `bootstrap.waiting` until `maybe_complete_bootstrap`
172        // promotes them. Install no longer auto-seeds — the host
173        // owns when bootstrap starts.
174        self.drive_pending_bootstrap_requests();
175        let bootstrap_was_pending = self.bootstrap.pending;
176        // Per-phase BootstrapComplete steps accumulate here as each
177        // queued key drains. The final `WaitingOnBootstrap` /
178        // terminal `BootstrapComplete` decision below uses
179        // `bootstrap_was_pending` + post-drain `bootstrap_pending` to
180        // detect a partial drain (queue not yet empty, async pending).
181        let mut bootstrap_phases_completed: usize = 0;
182
183        // While bootstrap is pending the ingress drain consumes only
184        // events that can advance the bootstrap call (its async
185        // completions, transport failures). Body-side events
186        // (AppEvent, EnvelopeFrom, Invoke) requeue so the host
187        // observes the same pre-bootstrap delivery order on the
188        // cycle after BootstrapComplete fires. The loop re-drains
189        // when bootstrap completes mid-pass so body events queued
190        // before bootstrap finished still process in this cycle.
191        //
192        // Snapshot the pre-drain depth so per-envelope handlers see
193        // the receiver's overload signal even after the drain runs the
194        // queue to zero.
195        self.phase1_pre_drain_depth = self.ingress.len();
196        {
197            let _phase1 = tracing::debug_span!("engine.phase1_ingress").entered();
198            loop {
199                let was_pending = self.bootstrap.pending;
200                let ingress_events = self.ingress.drain_all();
201                if ingress_events.is_empty() {
202                    break;
203                }
204                for event in ingress_events {
205                    if self.bootstrap.pending && self.is_body_phase_ingress(&event) {
206                        let _ = self.ingress.push(event);
207                        continue;
208                    }
209                    steps.extend(self.process_ingress_event(event));
210                    if self.maybe_complete_bootstrap() {
211                        bootstrap_phases_completed += 1;
212                        self.seed_bootstrap_call();
213                    }
214                }
215                if !was_pending || self.bootstrap.pending {
216                    break;
217                }
218            }
219        }
220
221        {
222            let _phase2 = tracing::debug_span!("engine.phase2_frontier_drain").entered();
223            loop {
224                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
225                    let step = self.invoke_one(op_ref, exec_id);
226                    steps.push(step);
227                    ops_invoked += 1;
228                    if budget_hit(self.cycle_op_budget, ops_invoked) {
229                        budget_exceeded = true;
230                        break;
231                    }
232                }
233                // If a queued bootstrap phase just drained, re-seed
234                // the next one and cascade in-cycle so the host sees
235                // every BootstrapComplete + the body's first ops in a
236                // single poll when budget permits.
237                if self.maybe_complete_bootstrap() {
238                    bootstrap_phases_completed += 1;
239                    if budget_exceeded {
240                        break;
241                    }
242                    if !self.seed_bootstrap_call() {
243                        break;
244                    }
245                } else {
246                    break;
247                }
248            }
249        }
250
251        // Before draining, surface any FIFO drops accumulated since
252        // the last poll as an `InfraEvent::BusOverflow`. Publishing
253        // before drain keeps the event in this cycle's routing pass.
254        let bus_dropped = self.bus.take_dropped_count();
255        if bus_dropped > 0 {
256            self.bus.publish(crate::bus::NodeEvent::Infra(
257                crate::bus::InfraEvent::BusOverflow { count: bus_dropped },
258            ));
259        }
260        // For every NodeEvent on the bus, derive its `kind` string
261        // (via NodeEvent::kind()) and look up the subscribed
262        // `NodeSiteId`s. For each site, write a `TriggerValue` at a
263        // fresh `ExecId` and push the site's downstream consumers
264        // onto the frontier. This matches the wire delivery
265        // semantics per `docs/ADDRESSING.md`.
266        let events = self.bus.drain();
267        if !events.is_empty() {
268            let mut to_seed: Vec<crate::ids::NodeSiteId> = Vec::new();
269            for event in events {
270                let kind = event.kind();
271                let Some(sites) = self.event_subscriptions.get(kind) else {
272                    continue;
273                };
274                to_seed.extend(sites.iter().copied());
275            }
276            for site in to_seed {
277                let exec_id = self.allocate_exec_id();
278                let value: Box<dyn crate::slot_value::SlotValue> =
279                    Box::new(crate::syscall::values::TriggerValue);
280                self.exec.slot_table.insert((site, exec_id), Some(value));
281                let consumers: Vec<crate::ids::OpRef> = self
282                    .graphs_iter()
283                    .filter_map(|g| g.consumers.get(&site).cloned())
284                    .flatten()
285                    .collect();
286                for op_ref in consumers {
287                    self.exec.frontier.push_back((op_ref, exec_id));
288                }
289            }
290        }
291
292        let now_ns = self.framework.scheduler.now_ns();
293        let matured = self.framework.scheduler.poll_matured(now_ns);
294        for kind in matured {
295            self.handle_matured_timer(kind);
296        }
297
298        // Engine-side deadline scan runs first so an expired
299        // suspension fails this cycle even if a (now-stale)
300        // completion is also queued.
301        {
302            let _phase5 = tracing::debug_span!("engine.phase5_completions").entered();
303            steps.extend(self.expire_deadlines());
304            let completions = std::mem::take(&mut self.exec.pending_completions);
305            for c in completions {
306                steps.extend(self.handle_completion(c.cmd_id, c.results));
307            }
308            if self.maybe_complete_bootstrap() {
309                bootstrap_phases_completed += 1;
310                self.seed_bootstrap_call();
311            }
312        }
313
314        // tracker entry and publish bus events on state changes.
315        {
316            let now_ns = self.framework.scheduler.now_ns();
317            let transitions = self.framework.rtt_tracker.scan_phi(now_ns);
318            for transition in transitions {
319                let event = match transition {
320                    crate::framework::rtt_tracker::PhiTransition::Suspect { site, phi } => {
321                        crate::bus::InfraEvent::PeerSuspect { site, phi }
322                    }
323                    crate::framework::rtt_tracker::PhiTransition::Down { site, phi } => {
324                        crate::bus::InfraEvent::PeerDown { site, phi }
325                    }
326                    crate::framework::rtt_tracker::PhiTransition::Live { site } => {
327                        crate::bus::InfraEvent::PeerLive { site }
328                    }
329                };
330                self.bus.publish(crate::bus::NodeEvent::Infra(event));
331            }
332        }
333
334        if !budget_exceeded {
335            loop {
336                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
337                    let step = self.invoke_one(op_ref, exec_id);
338                    steps.push(step);
339                    ops_invoked += 1;
340                    if budget_hit(self.cycle_op_budget, ops_invoked) {
341                        budget_exceeded = true;
342                        break;
343                    }
344                }
345                if self.maybe_complete_bootstrap() {
346                    bootstrap_phases_completed += 1;
347                    if budget_exceeded {
348                        break;
349                    }
350                    if !self.seed_bootstrap_call() {
351                        break;
352                    }
353                } else {
354                    break;
355                }
356            }
357        }
358
359        // For each phase queued by the host via
360        // `Engine::fire_lifecycle(phase)`, push every enrolled
361        // `LifecyclePhase` op onto the frontier with a fresh ExecId
362        // and emit a `LifecycleFired` step. Cascade-drain so newly
363        // pushed ops invoke in this same poll cycle.
364        let fired: Vec<String> = std::mem::take(&mut self.fired_phases);
365        for phase in &fired {
366            let op_refs: Vec<crate::ids::OpRef> =
367                self.lifecycle_table.get(phase).cloned().unwrap_or_default();
368            let pairs: Vec<(crate::ids::OpRef, crate::ids::ExecId)> = op_refs
369                .into_iter()
370                .map(|op_ref| (op_ref, self.allocate_exec_id()))
371                .collect();
372            for (op_ref, exec_id) in pairs {
373                self.exec.frontier.push_back((op_ref, exec_id));
374            }
375            steps.push(EngineStep::LifecycleFired {
376                phase: phase.clone(),
377            });
378        }
379        if !budget_exceeded {
380            loop {
381                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
382                    let step = self.invoke_one(op_ref, exec_id);
383                    steps.push(step);
384                    ops_invoked += 1;
385                    if budget_hit(self.cycle_op_budget, ops_invoked) {
386                        budget_exceeded = true;
387                        break;
388                    }
389                }
390                if self.maybe_complete_bootstrap() {
391                    bootstrap_phases_completed += 1;
392                    if budget_exceeded {
393                        break;
394                    }
395                    if !self.seed_bootstrap_call() {
396                        break;
397                    }
398                } else {
399                    break;
400                }
401            }
402        }
403
404        let _phase8 = tracing::debug_span!("engine.phase8_outbound").entered();
405        for env in self.framework.outbound_queue.drain_all() {
406            steps.push(EngineStep::SendEnvelope(env));
407        }
408        // Surface peer-resolution failures captured by the wire
409        // syscall during this poll. Each entry becomes a dedicated
410        // EngineStep::PeerResolveFailed; the matching bus event was
411        // already published by the syscall, so subscribers got the
412        // routable mirror in real time.
413        for (peer, op_ref) in self.framework.pending_peer_resolve_failures.drain(..) {
414            steps.push(EngineStep::PeerResolveFailed {
415                peer,
416                op_ref,
417                exec_id: crate::ids::ExecId::from(0u64),
418            });
419        }
420        // Emit a single `OutboundDropped` step capturing FIFO drops
421        // that accumulated since the previous poll (e.g., when push
422        // exceeded `max_outbound_queue`).
423        let dropped = self.framework.outbound_queue.take_dropped_count();
424        if dropped > 0 {
425            steps.push(EngineStep::OutboundDropped { count: dropped });
426        }
427        // Surface a single `CycleBudgetExceeded` at the end so the
428        // host knows to re-poll. The step is appended after all
429        // observable per-op steps so the budget signal trails the
430        // in-cycle work it bounded.
431        if budget_exceeded {
432            steps.push(EngineStep::CycleBudgetExceeded { ops_invoked });
433        }
434        // Bootstrap state observable. Each queued bootstrap key that
435        // drained during this cycle emits its own `BootstrapComplete`
436        // — multi-target installs surface one signal per target in
437        // install order. `WaitingOnBootstrap` lands when the
438        // *currently* in-flight bootstrap op suspended on async
439        // completion and the host must drive the resumption before
440        // re-polling. Body-phase ops are gated from firing while
441        // `bootstrap_pending` is set.
442        if bootstrap_was_pending {
443            for _ in 0..bootstrap_phases_completed {
444                steps.push(EngineStep::BootstrapComplete);
445            }
446            if self.bootstrap.pending {
447                steps.push(EngineStep::WaitingOnBootstrap);
448            }
449        }
450        // Drain AppEmit / AppNotify syscall outputs into
451        // EngineStep::AppEvent. `Emit` carries serialized payload
452        // bytes; `Notify` is a marker-only event (empty `value_bytes`).
453        for ev in std::mem::take(&mut self.framework.pending_app_events) {
454            let (module_name, topic, value_bytes) = match ev {
455                crate::bus::AppEvent::Emit { name, value_bytes } => {
456                    (String::new(), name, value_bytes)
457                }
458                crate::bus::AppEvent::Notify { name } => (String::new(), name, Vec::new()),
459            };
460            steps.push(EngineStep::AppEvent {
461                module_name,
462                topic,
463                value_bytes,
464            });
465        }
466
467        steps
468    }
469
470    /// Whether an ingress event would seed body-phase work. The
471    /// bootstrap gate requeues these while the bootstrap call is
472    /// outstanding so app-event delivery + envelope routing
473    /// observe the post-bootstrap engine state. Bootstrap-resuming
474    /// completions, transport failures, and host-injected timer
475    /// matures bypass the gate so the bootstrap call can progress.
476    fn is_body_phase_ingress(&self, event: &IngressEvent) -> bool {
477        matches!(
478            event,
479            IngressEvent::AppEvent { .. }
480                | IngressEvent::EnvelopeFrom { .. }
481                | IngressEvent::Invoke { .. }
482        )
483    }
484
485    /// Ingress-event router. Dispatches each event variant to its
486    /// handler: envelopes route to `deliver_inbound_internal`,
487    /// completions to `handle_completion`, app events to the bus,
488    /// matured timers to `handle_matured_timer`, invoke events to
489    /// the per-module entry point.
490    fn process_ingress_event(&mut self, event: IngressEvent) -> Vec<EngineStep> {
491        match event {
492            IngressEvent::Completion { cmd_id, results } => {
493                // The host's transport pre-decodes opaque payloads
494                // and hands them as `Vec<Vec<u8>>`. The engine wraps
495                // each entry as a `BytesValue` and forwards to
496                // `handle_completion`, which writes the slots and
497                // pushes downstream consumers.
498                let typed_results: Vec<(String, Box<dyn crate::slot_value::SlotValue>)> = results
499                    .into_iter()
500                    .enumerate()
501                    .map(|(i, bytes)| {
502                        let value = crate::syscall::values::BytesValue(bytes);
503                        (
504                            format!("out_{i}"),
505                            Box::new(value) as Box<dyn crate::slot_value::SlotValue>,
506                        )
507                    })
508                    .collect();
509                self.handle_completion(cmd_id, typed_results)
510            }
511            IngressEvent::EnvelopeFrom {
512                src_peer,
513                envelope,
514                src_observed_address,
515            } => {
516                // Backpressure pre-flight: silent-drop senders bypass
517                // dispatch; others route normally and then check the
518                // post-pop ingress depth against the high-water mark
519                // to emit a single BackoffNotice.
520                if self
521                    .framework
522                    .peer_state
523                    .backpressure
524                    .is_silent_drop_active(src_peer)
525                {
526                    return Vec::new();
527                }
528                // Order: claimed (envelope) first so the entry
529                // exists for the observed-address registration step.
530                // Observed wins for NAT-translated cases because it
531                // appends a fresh address the claimed snapshot
532                // cannot know.
533                //
534                // The address-book hint is best-effort under allocator
535                // pressure: if the dedup buffer cannot be reserved we
536                // drop the hint, emit `WireReceiveError::AllocationFailed`,
537                // and continue routing — fills do not depend on the
538                // address book.
539                let mut steps = Vec::new();
540                if let Err(alloc) =
541                    self.merge_src_peer_addresses(src_peer, &envelope.src_peer_addresses)
542                {
543                    steps.push(self.emit_wire_receive_error(
544                        Some(src_peer),
545                        0,
546                        0,
547                        alloc.byte_count,
548                        crate::bus::WireReceiveErrorKind::AllocationFailed {
549                            byte_count: alloc.byte_count,
550                            reason: alloc.reason,
551                        },
552                    ));
553                }
554                if let Some(observed) = src_observed_address {
555                    self.merge_src_observed_address(src_peer, observed);
556                }
557                steps.extend(self.route_envelope(envelope, Some(src_peer)));
558                let backoff_steps = self.maybe_emit_backoff_notice(
559                    src_peer,
560                    crate::framework::BackoffCause::QueueFull,
561                    None,
562                );
563                steps.extend(backoff_steps);
564                steps
565            }
566            IngressEvent::AppEvent {
567                module_name,
568                input_name,
569                value_bytes,
570            } => self.deliver_app_event(module_name, input_name, value_bytes),
571            IngressEvent::Invoke {
572                module_name,
573                inputs,
574                exec_id,
575            } => self.deliver_invoke(module_name, inputs, exec_id),
576            IngressEvent::TimerMatured { at_ns } => {
577                self.framework.scheduler.set_now(at_ns);
578                let matured = self.framework.scheduler.poll_matured(at_ns);
579                let mut out = Vec::new();
580                for kind in matured {
581                    out.extend(self.handle_matured_timer(kind));
582                }
583                out
584            }
585            IngressEvent::CompletionFailed { cmd_id, detail } => {
586                // Async-completion FAILURE: route to the typed
587                // OpFailed path via handle_completion_failed so the
588                // parked op fails as itself rather than as a
589                // success-bytes payload.
590                self.handle_completion_failed(
591                    cmd_id,
592                    crate::bus::OpError {
593                        detail,
594                        ..Default::default()
595                    },
596                )
597            }
598            IngressEvent::SendFailed {
599                wire_req_id,
600                peer: _peer,
601                reason: _reason,
602            } => {
603                // Transport-side delivery failure. Consumes the
604                // in-flight registration so the request tracker doesn't
605                // leak the entry; the parked originator op's failure is
606                // surfaced by the wire-timeout drain (`drain_stale`)
607                // rather than waiting for the TTL to elapse.
608                let now_ns = self.framework.scheduler.now_ns();
609                let _ = self
610                    .framework
611                    .request_tracker
612                    .observe_response(wire_req_id, now_ns);
613                Vec::new()
614            }
615            IngressEvent::AppIngressError {
616                source,
617                byte_count,
618                kind,
619            } => {
620                // Cross-thread bridge for `CompletionSink::complete`
621                // exceeding the per-completion result cap. Re-publish
622                // on the bus so subscribers see the rejection
623                // alongside the synchronous emissions from
624                // `Node::deliver_event` / `Node::invoke`.
625                self.bus.publish(crate::bus::NodeEvent::Infra(
626                    crate::bus::InfraEvent::AppIngressError {
627                        source,
628                        byte_count,
629                        kind,
630                    },
631                ));
632                Vec::new()
633            }
634        }
635    }
636
637    /// Deliver an inbound `AppEvent`: look up the addressed graph,
638    /// resolve the `input_name` to a `NodeSiteId`, wrap the bytes as
639    /// a `BytesValue`, seed the slot at a fresh `ExecId`, and push
640    /// ready downstream consumers onto the frontier. Surfaces an
641    /// observable `EngineStep::AppEvent` so the host can confirm
642    /// delivery even if no consumer exists yet.
643    fn deliver_app_event(
644        &mut self,
645        module_name: String,
646        input_name: String,
647        value_bytes: Vec<u8>,
648    ) -> Vec<EngineStep> {
649        let step = EngineStep::AppEvent {
650            module_name: module_name.clone(),
651            topic: input_name.clone(),
652            value_bytes: value_bytes.clone(),
653        };
654        let Some(graph) = self.graph(&module_name) else {
655            return vec![step];
656        };
657        let Some(&site) = graph.site_names.get(&input_name) else {
658            return vec![step];
659        };
660        let exec_id = self.allocate_exec_id();
661        let value = crate::syscall::values::BytesValue(value_bytes);
662        self.exec
663            .slot_table
664            .insert((site, exec_id), Some(Box::new(value)));
665        self.push_ready_consumers(&[site], exec_id);
666        vec![step]
667    }
668
669    /// Deliver an inbound `Invoke`: seed every `(input_name,
670    /// value_bytes)` pair into the addressed graph's matching site
671    /// at the supplied `exec_id`, then push the ready consumers
672    /// onto the frontier. Unknown modules / unknown input names are
673    /// silent no-ops (the host can detect via subsequent polls
674    /// producing no steps).
675    fn deliver_invoke(
676        &mut self,
677        module_name: String,
678        inputs: Vec<(String, Vec<u8>)>,
679        exec_id: crate::ids::ExecId,
680    ) -> Vec<EngineStep> {
681        let Some(graph) = self.graph(&module_name) else {
682            return Vec::new();
683        };
684        let mut seeded_sites: Vec<crate::ids::NodeSiteId> = Vec::new();
685        let pairs: Vec<(crate::ids::NodeSiteId, Vec<u8>)> = inputs
686            .into_iter()
687            .filter_map(|(name, bytes)| graph.site_names.get(&name).map(|&site| (site, bytes)))
688            .collect();
689        for (site, bytes) in pairs {
690            let value = crate::syscall::values::BytesValue(bytes);
691            self.exec
692                .slot_table
693                .insert((site, exec_id), Some(Box::new(value)));
694            seeded_sites.push(site);
695        }
696        if !seeded_sites.is_empty() {
697            self.push_ready_consumers(&seeded_sites, exec_id);
698        }
699        Vec::new()
700    }
701
702    /// Route a single inbound envelope. Iterates each `SlotFill` and
703    /// dispatches it via its multiaddr `dest_suffix` per
704    /// `docs/ADDRESSING.md`. The receiver doesn't consult any
705    /// subscription table or routing map - the address suffix is the
706    /// routing key. Two suffix shapes are supported:
707    ///   - `/site/<NodeSiteId>` → data-plane slot fill.
708    ///   - `/component/<cref>/op/<name>` → control-plane component
709    ///     dispatch.
710    fn route_envelope(
711        &mut self,
712        env: crate::envelope::WireEnvelope,
713        src_peer: Option<crate::ids::PeerId>,
714    ) -> Vec<EngineStep> {
715        // Sender-side back-pressure ingest. Inbound envelopes whose
716        // first fill carries the reserved
717        // `BackoffNoticePayload` type-hash are framework-internal -
718        // intercept them here, decode the payload, advise the
719        // sender-side `BackoffTable`, and short-circuit the normal
720        // data-plane / control-plane delivery so user Components
721        // never observe a notice envelope.
722        if env
723            .fills
724            .first()
725            .is_some_and(|f| f.type_hash == crate::framework::backoff_notice_type_hash())
726        {
727            return self.ingest_backoff_notice(env, src_peer);
728        }
729
730        let correlation = env.correlation.as_ref().map(|c| c.wire_req_id).unwrap_or(0);
731
732        // hook. If the inbound envelope's wire_req_id matches an
733        // in-flight outbound round trip we registered at dispatch,
734        // pop the sample + feed it into the RTT tracker so the
735        // hierarchical-fallback EMAs converge on real per-edge,
736        // per-site, per-chain, and global RTT distributions.
737        let mut response_from_site: Option<crate::ids::NodeSiteId> = None;
738        if correlation != 0 {
739            let now_ns = self.framework.scheduler.now_ns();
740            if let Some(sample) = self
741                .framework
742                .request_tracker
743                .observe_response(correlation, now_ns)
744            {
745                self.framework.rtt_tracker.observe_round_trip(
746                    sample.target_site,
747                    sample.chain,
748                    sample.elapsed_ns,
749                    now_ns,
750                );
751                response_from_site = Some(sample.target_site);
752            }
753        }
754
755        // piggyback. The sender attached EdgeRttReport entries
756        // describing its observed outgoing edges in the chain. We
757        // record each report against the entry for the sending
758        // site so multi-hop chain budgets can compose from this
759        // direct neighbor's table.
760        if let Some(from_site) = response_from_site {
761            for report in &env.edge_rtt_reports {
762                self.framework.rtt_tracker.ingest_reported_outgoing(
763                    from_site,
764                    crate::ids::NodeSiteId::from(report.next_hop_site_id),
765                    report.chain_id,
766                    report.srtt_ns,
767                    report.rttvar_ns,
768                    report.sample_count,
769                );
770            }
771        }
772
773        // Capture the inbound envelope's deadline budget + arrival
774        // timestamp so consumer ops (especially `wire.Send` while
775        // forwarding) can propagate them per Dapper.
776        let inbound_remaining_deadline_ns = if env.remaining_deadline_ns > 0 {
777            Some(env.remaining_deadline_ns)
778        } else {
779            None
780        };
781        let arrival_ns = self.framework.scheduler.now_ns();
782
783        let mut steps = Vec::new();
784        for (fill_index, fill) in env.fills.into_iter().enumerate() {
785            steps.extend(self.deliver_fill(
786                fill,
787                fill_index as u32,
788                correlation,
789                src_peer,
790                arrival_ns,
791                inbound_remaining_deadline_ns,
792            ));
793        }
794        steps
795    }
796
797    /// Dispatch one `SlotFill` per `docs/ADDRESSING.md`. Parses
798    /// `fill.dest_suffix` as a multiaddr and routes by the trailing
799    /// segment shape. `fill_index` is the fill's 0-based position
800    /// within the inbound envelope; surfaces on per-fill failure
801    /// events so subscribers can identify the failing fill when
802    /// the envelope partial-delivers.
803    fn deliver_fill(
804        &mut self,
805        fill: crate::envelope::SlotFill,
806        fill_index: u32,
807        wire_req_id: u64,
808        src_peer: Option<crate::ids::PeerId>,
809        arrival_ns: u64,
810        inbound_remaining_deadline_ns: Option<u64>,
811    ) -> Vec<EngineStep> {
812        let addr = match crate::framework::Address::from_bytes(&fill.dest_suffix) {
813            Ok(a) => a,
814            Err(e) => {
815                return vec![self.emit_wire_decode_failure(
816                    0,
817                    fill.payload.len(),
818                    format!("dest_suffix parse: {e}"),
819                )];
820            }
821        };
822
823        // Data-plane suffix: /site/<NodeSiteId>. The Site segment
824        // uniquely identifies the slot (NodeSiteIds are globally
825        // unique within a Node).
826        if let Some(site_id) = addr.site_id() {
827            return self.deliver_data_plane_fill(
828                site_id,
829                fill,
830                fill_index,
831                src_peer,
832                wire_req_id,
833                arrival_ns,
834                inbound_remaining_deadline_ns,
835            );
836        }
837
838        // Control-plane suffix: /component/<cref>/op/<name>.
839        if let (Some(cref), Some(op_name)) = (addr.component_ref(), addr.op_name()) {
840            let op_name = op_name.to_string();
841            return self.deliver_control_plane_fill(cref, op_name, fill, wire_req_id);
842        }
843
844        vec![self.emit_wire_decode_failure(
845            0,
846            fill.payload.len(),
847            "address shape neither data-plane nor control-plane".to_string(),
848        )]
849    }
850
851    /// Publish a `WireDecodeFailure` onto the bus and return the
852    /// matching `EngineStep`. The bus event lets in-process
853    /// telemetry subscribers react; the EngineStep surfaces the
854    /// same context to the host poll() caller.
855    fn emit_wire_decode_failure(
856        &mut self,
857        hash: u64,
858        payload_size: usize,
859        detail: String,
860    ) -> EngineStep {
861        self.bus.publish(crate::bus::NodeEvent::Infra(
862            crate::bus::InfraEvent::WireDecodeFailure {
863                hash,
864                payload_size,
865                detail: detail.clone(),
866            },
867        ));
868        EngineStep::WireDecodeFailed {
869            hash,
870            payload_size,
871            detail,
872        }
873    }
874
875    /// Data-plane delivery: decode the fill payload into a typed
876    /// `SlotValue` via the wire decoder registry, write it into the
877    /// addressed slot at a fresh `ExecId`, and push the slot's
878    /// downstream consumers onto the frontier. Walks each installed
879    /// graph's `consumers` map for the matching `NodeSiteId`.
880    ///
881    /// Per-fill failures (unknown type-hash, type mismatch against
882    /// the slot's declared wire type, decoder error) surface as a
883    /// `WireReceiveError` InfraEvent + matching `WireReceiveFailed`
884    /// EngineStep. The failing fill drops; sibling fills in the
885    /// same envelope still deliver (the caller continues iterating).
886    #[allow(clippy::too_many_arguments)]
887    fn deliver_data_plane_fill(
888        &mut self,
889        site_id: crate::ids::NodeSiteId,
890        mut fill: crate::envelope::SlotFill,
891        fill_index: u32,
892        src_peer: Option<crate::ids::PeerId>,
893        wire_req_id: u64,
894        arrival_ns: u64,
895        inbound_remaining_deadline_ns: Option<u64>,
896    ) -> Vec<EngineStep> {
897        // Resolve the consumer ops from each installed graph; a
898        // NodeSiteId belongs to at most one graph because IDs are
899        // globally unique, but we tolerate empty lookups.
900        let consumers: Vec<crate::ids::OpRef> = self
901            .graphs_iter()
902            .filter_map(|g| g.consumers.get(&site_id).cloned())
903            .flatten()
904            .collect();
905
906        // Resolve the typed `SlotValue` BEFORE allocating an
907        // ExecId or stamping inbound context: failure modes return
908        // a WireReceiveError step and the envelope's other fills
909        // continue to deliver without polluting the slot table.
910        // Trigger fills bypass the decoder lookup entirely.
911        let value: Box<dyn crate::slot_value::SlotValue> = if fill.trigger_only {
912            Box::new(crate::syscall::values::TriggerValue)
913        } else {
914            match self.decode_typed_fill(&mut fill, fill_index, site_id, src_peer) {
915                Ok(v) => v,
916                Err(step) => return vec![step],
917            }
918        };
919
920        let exec_id = self.allocate_exec_id();
921        // Stamp the inbound envelope context for this ExecId.
922        // Components access this through `RuntimeResourceRef` (RX
923        // gates filter on `src_peer`; `wire.Send` forwarding inside a
924        // chain reuses `wire_req_id` + propagates `remaining_deadline_ns`
925        // minus elapsed local time).
926        self.framework.inbound_contexts.insert(
927            exec_id,
928            crate::framework::InboundContext {
929                src_peer,
930                wire_req_id: if wire_req_id != 0 {
931                    Some(wire_req_id)
932                } else {
933                    None
934                },
935                arrival_ns: Some(arrival_ns),
936                remaining_deadline_ns: inbound_remaining_deadline_ns,
937            },
938        );
939        // `slot_write` routes through the engine's budget-release
940        // bookkeeping so a wire-receive overwriting a prior carrier
941        // releases the prior `charged_bytes()` against
942        // `ingress_bytes_in_flight`. Fresh-slot writes (the common
943        // case here, since `exec_id` is freshly allocated) hit the
944        // no-prior branch and incur the same cost as the raw
945        // `slot_table.insert`.
946        self.slot_write(site_id, exec_id, value);
947
948        // If site_id is a wire.Recv's payload site, also populate the
949        // paired sender site with PeerIdValue(src_peer) for the same
950        // ExecId. Downstream user ops read this as a graph value to
951        // identify provenance; reply-to is `g.net_out(name, sender, reply)`.
952        let sender_site: Option<crate::ids::NodeSiteId> = self
953            .graphs_iter()
954            .find_map(|g| g.recv_sender_sites.get(&site_id).copied());
955        if let (Some(sender_site), Some(peer)) = (sender_site, src_peer) {
956            let sender_value: Box<dyn crate::slot_value::SlotValue> =
957                Box::new(crate::syscall::values::PeerIdValue(peer));
958            self.exec
959                .slot_table
960                .insert((sender_site, exec_id), Some(sender_value));
961        }
962
963        for op_ref in consumers {
964            self.exec.frontier.push_back((op_ref, exec_id));
965        }
966        Vec::new()
967    }
968
969    /// Resolve a non-trigger data-plane fill into its typed
970    /// `SlotValue` carrier. The routing tree branches on the
971    /// destination slot's binding:
972    ///
973    /// - **Backend-bound slot** — the engine takes ownership of
974    ///   `fill.payload` via `std::mem::take` (zero-copy ownership
975    ///   transfer; `fill.payload` is already framework-owned from
976    ///   envelope decode) and hands it to the bound backend's
977    ///   `materialize_from_wire`. The typed tensor lands inside a
978    ///   `BackendTensorCarrier` whose `charged_bytes` + `backend_ref`
979    ///   are stamped to the engine-side accounting before the carrier
980    ///   is returned for slot-table install.
981    /// - **Framework-carrier slot** — the wire decoder registry
982    ///   resolves the `type_hash` to a bincode decoder; the decoded
983    ///   `SlotValue` rides on as-is.
984    ///
985    /// Failure modes surface as typed `WireReceiveError` InfraEvents
986    /// + matching `WireReceiveFailed` EngineSteps:
987    ///
988    /// - **TypeMismatch** — destination slot declares an expected
989    ///   wire-type hash via `GraphSlot::recv_wire_type_hash` and
990    ///   the fill's `type_hash` does not match. Checked first so a
991    ///   mis-typed payload never reaches the decoder.
992    /// - **UnknownTypeHash** — framework-carrier path only; no
993    ///   decoder is registered for the stamped hash.
994    /// - **DecodeFailed** — registered decoder ran and rejected
995    ///   the bytes (framework-carrier path).
996    /// - **BudgetExceeded** — admitting the bytes would push the
997    ///   engine over `NodeConfig::ingress_byte_budget`.
998    /// - **BackendMaterializeFailed** — the bound backend's typed
999    ///   `materialize_from_wire` returned `Err` (backend path).
1000    ///
1001    /// The `Err` carries the typed `EngineStep` the caller will
1002    /// surface to `Engine::poll`'s return value. `EngineStep` is
1003    /// load-bearing for the host's failure visibility surface, so
1004    /// boxing it here would force every caller through an
1005    /// indirection layer that adds no clarity for a single
1006    /// internal call site.
1007    ///
1008    /// **No per-fill scratch buffer.** Backend-mediated tensor fills
1009    /// move `fill.payload` into `materialize_from_wire` via
1010    /// `std::mem::take`; the empty `Vec<u8>` left behind drops with
1011    /// the `SlotFill`. The only memcpy on the tensor path is the
1012    /// one the backend chooses to do — or skips via zero-copy
1013    /// adoption (`ArrayD::from_shape_vec` when alignment permits).
1014    #[allow(clippy::result_large_err)]
1015    fn decode_typed_fill(
1016        &mut self,
1017        fill: &mut crate::envelope::SlotFill,
1018        fill_index: u32,
1019        site_id: crate::ids::NodeSiteId,
1020        src_peer: Option<crate::ids::PeerId>,
1021    ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1022        let expected_hash: Option<u64> = self
1023            .graphs_iter()
1024            .find_map(|g| g.recv_wire_type_hash.get(&site_id).copied());
1025        if let Some(expected) = expected_hash {
1026            if expected != fill.type_hash {
1027                return Err(self.emit_wire_receive_error(
1028                    src_peer,
1029                    fill_index,
1030                    fill.type_hash,
1031                    fill.payload.len(),
1032                    crate::bus::WireReceiveErrorKind::TypeMismatch {
1033                        expected_hash: expected,
1034                    },
1035                ));
1036            }
1037        }
1038
1039        // Resolve the destination slot's binding so the backend-bound
1040        // branch can fire before the framework-carrier registry
1041        // lookup. A site without a recv_site → slot_id mapping (no
1042        // role consumes the Recv payload) takes the framework path.
1043        let slot_id = self
1044            .graphs_iter()
1045            .find_map(|g| g.recv_site_to_slot_id.get(&site_id).copied());
1046        let role_ref = slot_id.and_then(|id| self.role_ref_for_slot_id(id));
1047
1048        // Budget guard (uniform across branches): pre-charge the
1049        // fill's payload length against `NodeConfig::ingress_byte_budget`
1050        // before invoking either decoder. Successful admission leaves
1051        // the charge in the counter; failure paths release before
1052        // returning.
1053        let byte_count = fill.payload.len();
1054        if let Err(reason) = self.try_charge(byte_count) {
1055            return Err(self.emit_wire_receive_error(
1056                src_peer,
1057                fill_index,
1058                fill.type_hash,
1059                byte_count,
1060                crate::bus::WireReceiveErrorKind::BudgetExceeded {
1061                    byte_count: reason.byte_count,
1062                    budget_remaining: reason.budget_remaining,
1063                },
1064            ));
1065        }
1066
1067        if let Some((crate::registry::ComponentRole::Backend, backend_ref)) = role_ref {
1068            return self.materialize_via_backend(
1069                fill,
1070                fill_index,
1071                src_peer,
1072                byte_count,
1073                backend_ref,
1074            );
1075        }
1076
1077        let Some(decoder) = crate::slot_value::wire_decoder_registry()
1078            .get(&fill.type_hash)
1079            .copied()
1080        else {
1081            self.release(byte_count);
1082            return Err(self.emit_wire_receive_error(
1083                src_peer,
1084                fill_index,
1085                fill.type_hash,
1086                fill.payload.len(),
1087                crate::bus::WireReceiveErrorKind::UnknownTypeHash,
1088            ));
1089        };
1090        decoder(&fill.payload).map_err(|e| {
1091            self.release(byte_count);
1092            self.emit_wire_receive_error(
1093                src_peer,
1094                fill_index,
1095                fill.type_hash,
1096                byte_count,
1097                crate::bus::WireReceiveErrorKind::DecodeFailed {
1098                    error_summary: e.to_string(),
1099                },
1100            )
1101        })
1102    }
1103
1104    /// Backend-bound branch of [`Self::decode_typed_fill`]. Hands the
1105    /// inbound bytes to the bound backend via the role-dispatch
1106    /// registry; wraps the typed `Self::Tensor` in a
1107    /// `BackendTensorCarrier` whose engine-side accounting fields
1108    /// (`charged_bytes`, `backend_ref`) are stamped before the
1109    /// carrier is returned. On error releases the byte charge and
1110    /// emits `BackendMaterializeFailed`.
1111    #[allow(clippy::result_large_err)]
1112    fn materialize_via_backend(
1113        &mut self,
1114        fill: &mut crate::envelope::SlotFill,
1115        fill_index: u32,
1116        src_peer: Option<crate::ids::PeerId>,
1117        byte_count: usize,
1118        backend_ref: crate::ids::ComponentRef,
1119    ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1120        // `mem::take` transfers ownership of `fill.payload` to the
1121        // backend at zero cost: the wire bytes are already
1122        // framework-owned (prost allocated them during envelope
1123        // decode), and the empty `Vec<u8>` left in `fill.payload`
1124        // drops with the `SlotFill`. No scratch buffer, no memcpy on
1125        // the framework side.
1126        let bytes = std::mem::take(&mut fill.payload);
1127        let type_hash = fill.type_hash;
1128
1129        // Take the backend component out of the Vec so dispatch can
1130        // borrow it without holding a long lease on `engine.components`.
1131        // Restore on the way out — even on error paths.
1132        let Some(mut taken) = self.take_component(backend_ref) else {
1133            self.release(byte_count);
1134            return Err(self.emit_wire_receive_error(
1135                src_peer,
1136                fill_index,
1137                type_hash,
1138                byte_count,
1139                crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1140                    backend_ref,
1141                    backend_error_summary: "backend component slot empty".to_string(),
1142                },
1143            ));
1144        };
1145
1146        // Reach the backend through the per-T dispatcher in
1147        // `role_dispatchers`. The dispatcher closes over the typed
1148        // `Self::Tensor` so the boxed `SlotValue` returned here is
1149        // already a `BackendTensorCarrier` (the derive bridge does
1150        // the wrap).
1151        let any: &mut dyn std::any::Any = taken.as_mut();
1152        let tid = (*any).type_id();
1153        let dispatcher = self.role_dispatchers.get(&tid).map(|d| d.materialize);
1154
1155        let result = if let Some(materialize) = dispatcher {
1156            (materialize)(any, type_hash, bytes)
1157        } else {
1158            Err(crate::slot_value::BackendMaterializeError {
1159                summary: "no BackendRuntime dispatcher registered".to_string(),
1160            })
1161        };
1162
1163        self.restore_component(backend_ref, taken);
1164
1165        match result {
1166            Ok(boxed) => {
1167                // Downcast to `BackendTensorCarrier` and stamp the
1168                // engine-side accounting fields the derive bridge left
1169                // as placeholders. The bridge constructs the carrier
1170                // with the typed clone / encode fn pointers; the
1171                // engine owns the budget counter and the backend
1172                // identity, so it fills them in here. The downcast
1173                // is infallible by the bridge's construction; any
1174                // hand-rolled `BackendRuntime::materialize_from_wire`
1175                // that returns a non-carrier `SlotValue` flows
1176                // through unchanged (no accounting stamp), which is
1177                // the right behaviour because non-carrier returns
1178                // never charge against the backend-tensor pool.
1179                let any_box = boxed.into_any_boxed();
1180                let final_boxed: Box<dyn crate::slot_value::SlotValue> =
1181                    match any_box.downcast::<crate::slot_value::BackendTensorCarrier>() {
1182                        Ok(mut carrier) => {
1183                            carrier.charged_bytes = byte_count;
1184                            carrier.backend_ref = backend_ref;
1185                            carrier
1186                        }
1187                        Err(other) => {
1188                            // The dispatcher returned a non-carrier
1189                            // `SlotValue`; route it through unchanged.
1190                            // `Box<dyn Any + Send + Sync>` downcasts
1191                            // back to `Box<dyn SlotValue>` only via a
1192                            // typed re-box, which we don't do — log
1193                            // and release the budget charge instead.
1194                            let _ = other;
1195                            self.release(byte_count);
1196                            return Err(self.emit_wire_receive_error(
1197                                src_peer,
1198                                fill_index,
1199                                type_hash,
1200                                byte_count,
1201                                crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1202                                    backend_ref,
1203                                    backend_error_summary:
1204                                        "backend bridge returned non-carrier SlotValue".to_string(),
1205                                },
1206                            ));
1207                        }
1208                    };
1209                Ok(final_boxed)
1210            }
1211            Err(e) => {
1212                self.release(byte_count);
1213                Err(self.emit_wire_receive_error(
1214                    src_peer,
1215                    fill_index,
1216                    type_hash,
1217                    byte_count,
1218                    crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1219                        backend_ref,
1220                        backend_error_summary: e.summary,
1221                    },
1222                ))
1223            }
1224        }
1225    }
1226
1227    /// Publish a `WireReceiveError` on the bus and return the
1228    /// matching `EngineStep`. Mirrors `emit_wire_decode_failure`
1229    /// for the per-fill typed-decode failure surface.
1230    fn emit_wire_receive_error(
1231        &mut self,
1232        src_peer: Option<crate::ids::PeerId>,
1233        fill_index: u32,
1234        actual_hash: u64,
1235        payload_size: usize,
1236        kind: crate::bus::WireReceiveErrorKind,
1237    ) -> EngineStep {
1238        self.bus.publish(crate::bus::NodeEvent::Infra(
1239            crate::bus::InfraEvent::WireReceiveError {
1240                src_peer,
1241                fill_index,
1242                actual_hash,
1243                payload_size,
1244                kind: kind.clone(),
1245            },
1246        ));
1247        EngineStep::WireReceiveFailed {
1248            src_peer,
1249            fill_index,
1250            actual_hash,
1251            payload_size,
1252            kind,
1253        }
1254    }
1255
1256    /// Control-plane delivery: invoke
1257    /// `component[cref].dispatch_atomic(op_name, [(payload,
1258    /// correlation, ...)], ctx)`. The component decodes the payload
1259    /// bytes via its own protocol logic.
1260    fn deliver_control_plane_fill(
1261        &mut self,
1262        component_ref: crate::ids::ComponentRef,
1263        op_name: String,
1264        fill: crate::envelope::SlotFill,
1265        wire_req_id: u64,
1266    ) -> Vec<EngineStep> {
1267        let payload = BytesValue(fill.payload);
1268        let correlation = WireReqIdValue(wire_req_id);
1269
1270        let inputs_storage: Vec<(String, Box<dyn SlotValue>)> = vec![
1271            ("payload".to_string(), Box::new(payload)),
1272            ("correlation".to_string(), Box::new(correlation)),
1273        ];
1274
1275        let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> = inputs_storage
1276            .iter()
1277            .map(|(n, h)| (n.as_str(), h.as_ref()))
1278            .collect();
1279
1280        // D2 take-and-restore so we can split-borrow the rest of
1281        // `self.framework` / `self.bus` etc. while the dispatching
1282        // component is held exclusively.
1283        let Some(mut taken) = self.take_component(component_ref) else {
1284            return Vec::new();
1285        };
1286
1287        let mut ctx = crate::runtime::RuntimeResourceRef {
1288            peers: crate::runtime::PeerCtx {
1289                gate: &mut self.framework.peer_state.gate,
1290                backoff: &mut self.framework.peer_state.backoff,
1291                governor: &mut self.framework.peer_state.governor,
1292                addresses: &mut self.framework.address_book,
1293                backpressure: &mut self.framework.peer_state.backpressure,
1294            },
1295            net: crate::runtime::NetCtx {
1296                outbound: &mut self.framework.outbound_queue,
1297                rtt: &mut self.framework.rtt_tracker,
1298                requests: &mut self.framework.request_tracker,
1299                dedup: &mut self.framework.inbound_dedup,
1300                pending_peer_resolve_failures: &mut self.framework.pending_peer_resolve_failures,
1301            },
1302            time: crate::runtime::TimeCtx {
1303                scheduler: &mut self.framework.scheduler,
1304            },
1305            syscall: crate::runtime::SyscallCtx {
1306                serialize_queue: &mut self.framework.serialize_queue,
1307                hold_table: &mut self.framework.hold_table,
1308                record_buffer: &mut self.framework.record_buffer,
1309                event_source: &mut self.framework.event_source,
1310                counters: &mut self.framework.counters,
1311                any_fired_groups: &mut self.framework.any_fired_groups,
1312                deadline_match_fired: &mut self.framework.deadline_match_fired,
1313                rng: &mut *self.framework.rng,
1314                pending_app_events: &mut self.framework.pending_app_events,
1315            },
1316            bus: &mut self.bus,
1317            ingress: std::sync::Arc::clone(&self.ingress),
1318            components: crate::runtime::ComponentsView {
1319                instances: Some(&self.components),
1320                slots: Some(&self.slots),
1321            },
1322            current: crate::runtime::CurrentCallCtx {
1323                op_ref: crate::ids::OpRef::from(0u64),
1324                exec_id: crate::ids::ExecId::from(0u64),
1325                self_peer: self.self_peer,
1326                node_attributes: &[],
1327                node_metadata: &[],
1328                inbound: crate::runtime::InboundCtx {
1329                    src_peer: None,
1330                    wire_req_id: None,
1331                    arrival_ns: None,
1332                    remaining_deadline_ns: None,
1333                },
1334                pending_completions: Vec::new(),
1335                next_command_id: &mut self.exec.ids.next_command_id,
1336            },
1337        };
1338
1339        let _ = crate::engine::invoke::call_protocol_dispatch_atomic(
1340            taken.as_mut(),
1341            &op_name,
1342            &inputs_for_dispatch,
1343            &mut ctx,
1344            &self.role_dispatchers,
1345        );
1346        let captured = std::mem::take(&mut ctx.current.pending_completions);
1347        drop(ctx);
1348        self.exec.pending_completions.extend(captured);
1349
1350        self.restore_component(component_ref, taken);
1351
1352        Vec::new()
1353    }
1354
1355    /// Route a matured timer to its consumer.
1356    /// - `Sleep`/`Completion` fulfil a pending `CommandId`.
1357    /// - `Interval` re-pushes its owning Op onto the frontier at a
1358    ///   fresh `ExecId`; the Op's `invoke` re-schedules the next
1359    ///   firing and emits the periodic `TriggerValue` downstream.
1360    /// - `After` fulfils the parked `CommandId` (which the Op
1361    ///   suspended on) with a single `TriggerValue`.
1362    fn handle_matured_timer(
1363        &mut self,
1364        kind: crate::framework::scheduler::TimerKind,
1365    ) -> Vec<EngineStep> {
1366        match kind {
1367            TimerKind::Sleep(cmd_id) | TimerKind::Completion(cmd_id) => {
1368                self.handle_completion(cmd_id, Vec::new())
1369            }
1370            TimerKind::Interval { key, .. } => {
1371                let op_ref = crate::ids::OpRef::from(key);
1372                let exec_id = self.allocate_exec_id();
1373                self.exec.frontier.push_back((op_ref, exec_id));
1374                Vec::new()
1375            }
1376            TimerKind::After { key } => {
1377                let cmd_id = CommandId::from(key);
1378                let value: Box<dyn SlotValue> = Box::new(crate::syscall::values::TriggerValue);
1379                self.handle_completion(cmd_id, vec![("trigger".to_string(), value)])
1380            }
1381        }
1382    }
1383
1384    /// Merge a sender-claimed `src_peer_addresses` list into the
1385    /// receiver's `AddressBook` entry for `src_peer`. Empty list is
1386    /// a no-op (the sender chose not to advertise). The
1387    /// skip-on-unchanged guard compares the decoded list to the
1388    /// existing entry via slice equality and elides the write when
1389    /// they match — without this the receiver would rewrite the
1390    /// entry once per envelope, swamping the address book with
1391    /// idempotent updates under load.
1392    ///
1393    /// Returns `Err(SrcAddressMergeAllocError)` when the dedup
1394    /// buffer (S4) or the address-book peer dedup (S5) cannot
1395    /// reserve. The caller surfaces this as
1396    /// `WireReceiveError::AllocationFailed`; the address-book hint
1397    /// is best-effort under allocator pressure (the envelope's
1398    /// other fills still route).
1399    fn merge_src_peer_addresses(
1400        &mut self,
1401        src_peer: crate::ids::PeerId,
1402        claimed_bytes: &[Vec<u8>],
1403    ) -> Result<(), SrcAddressMergeAllocError> {
1404        if claimed_bytes.is_empty() {
1405            return Ok(());
1406        }
1407        // S4: dedup buffer for parsed Addresses. Use try_reserve_exact
1408        // so an exhausted allocator surfaces as AllocationFailed
1409        // rather than aborting the receiver.
1410        let mut claimed: Vec<crate::framework::Address> = Vec::new();
1411        let claim_count = claimed_bytes.len();
1412        if crate::fallible::try_reserve_exact(&mut claimed, claim_count).is_err() {
1413            return Err(SrcAddressMergeAllocError {
1414                byte_count: claim_count
1415                    .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1416                reason: crate::bus::AllocFailReason::HeapExhausted,
1417            });
1418        }
1419        for bytes in claimed_bytes {
1420            match crate::framework::Address::from_bytes(bytes) {
1421                Ok(addr) => claimed.push(addr),
1422                // Parse failure on one segment drops the hint without
1423                // touching the address book; the envelope's fills still
1424                // route via their own dest_suffix parsing.
1425                Err(_) => return Ok(()),
1426            }
1427        }
1428        if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1429            if existing == claimed.as_slice() {
1430                return Ok(());
1431            }
1432        }
1433        // S5: `add_peer` runs its own try_reserve_exact for the
1434        // peer-entry dedup buffer. Allocation failure surfaces here
1435        // as AddressBookError::AllocationFailed; map to the same
1436        // WireReceiveErrorKind::AllocationFailed. Other errors
1437        // (Full, EmptyAddressList) are deployment-level signals
1438        // already observable elsewhere — swallow as before.
1439        match self.framework.address_book.add_peer(src_peer, claimed) {
1440            Ok(()) => Ok(()),
1441            Err(crate::framework::AddressBookError::AllocationFailed { requested }) => {
1442                Err(SrcAddressMergeAllocError {
1443                    byte_count: requested
1444                        .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1445                    reason: crate::bus::AllocFailReason::HeapExhausted,
1446                })
1447            }
1448            Err(_) => Ok(()),
1449        }
1450    }
1451
1452    /// Merge a transport-observed source address into the
1453    /// receiver's `AddressBook` entry for `src_peer`. Skips the
1454    /// write when the entry already contains `addr` (slice
1455    /// containment check) so a steady stream of envelopes from the
1456    /// same observed endpoint costs at most one `register_address`
1457    /// call. When no entry exists yet — the sender-claimed merge
1458    /// upstream may have short-circuited on an empty list — the
1459    /// observed address bootstraps a fresh one via `add_peer`.
1460    fn merge_src_observed_address(
1461        &mut self,
1462        src_peer: crate::ids::PeerId,
1463        addr: crate::framework::Address,
1464    ) {
1465        if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1466            if existing.contains(&addr) {
1467                return;
1468            }
1469            let _ = self.framework.address_book.register_address(src_peer, addr);
1470            return;
1471        }
1472        let _ = self.framework.address_book.add_peer(src_peer, vec![addr]);
1473    }
1474
1475    /// Sender-side ingest of a `BackoffNotice` envelope. Called from
1476    /// `route_envelope` when the first fill's `type_hash` matches
1477    /// `backoff_notice_type_hash`. Decodes the payload, applies the
1478    /// remote-advised back-off via `BackoffTable::record_remote_advisory`,
1479    /// and records a `PeerGovernor::record_failure` so the existing
1480    /// 5-failure `LifecycleTransition::WentDown` path stays the single
1481    /// peer-down decision site. Returns no `EngineStep`s — the notice
1482    /// never reaches a user Component.
1483    ///
1484    /// When the source peer is unknown (the inbound `EnvelopeFrom`
1485    /// was synthesised by a transport that couldn't attribute the
1486    /// sender), the engine drops the notice silently. When the
1487    /// payload bytes fail to decode, it emits a
1488    /// `WireDecodeFailure` so operators observe the corruption.
1489    fn ingest_backoff_notice(
1490        &mut self,
1491        env: crate::envelope::WireEnvelope,
1492        src_peer: Option<crate::ids::PeerId>,
1493    ) -> Vec<EngineStep> {
1494        let Some(fill) = env.fills.into_iter().next() else {
1495            return Vec::new();
1496        };
1497        let Some(src_peer) = src_peer else {
1498            // No attributable sender; drop silently.
1499            return Vec::new();
1500        };
1501        let Some(payload) = crate::framework::BackoffNoticePayload::decode(&fill.payload) else {
1502            return vec![self.emit_wire_decode_failure(
1503                fill.type_hash,
1504                fill.payload.len(),
1505                "BackoffNoticePayload bincode decode failed".to_string(),
1506            )];
1507        };
1508
1509        let now_ns = self.framework.scheduler.now_ns();
1510        // Advise the sender-side BackoffTable using the receiver's
1511        // quoted delay (§5.2). The existing BackoffGateTx already
1512        // reads `should_retry(peer, now_ns)` and respects the new
1513        // `next_retry_ns`, so no new gate is needed.
1514        self.framework.peer_state.backoff.record_remote_advisory(
1515            src_peer,
1516            now_ns,
1517            payload.min_backoff_ns,
1518        );
1519        // Record a peer-governor failure so the existing 5-failure
1520        // `LifecycleTransition::WentDown` surfacing remains the
1521        // single down-decision path (§5.3).
1522        let transition = self
1523            .framework
1524            .peer_state
1525            .governor
1526            .record_failure(src_peer, now_ns);
1527        // Surface the WentDown lifecycle transition if the receipt
1528        // of this notice pushed the sender's local view of the peer
1529        // across the threshold. The bus event mirrors the existing
1530        // PeerSuspect/PeerDown surfacing path.
1531        if matches!(transition, crate::framework::LifecycleTransition::WentDown,) {
1532            // The existing PhiTransition path emits PeerDown by site;
1533            // here the trigger is the per-peer governor decision so
1534            // no site-level info is available. Telemetry on the
1535            // tracker entry remains via PeerHealth.
1536        }
1537        // Cause is informational on the sender side - it's already
1538        // logged at the receiver via InfraEvent::BackoffNoticeSent.
1539        let _ = payload.cause();
1540        Vec::new()
1541    }
1542
1543    /// Receiver-side back-pressure hook. When the ingress depth
1544    /// crosses the high-water mark (or
1545    /// the caller forces emission via `force = true` from the
1546    /// φ-accrual scan), consult the `BackpressureTracker` for the
1547    /// `src_peer` and - on `Decision::EmitNotice` - mint a
1548    /// `BackoffNotice` envelope back to the sender, push it onto
1549    /// `OutboundQueue`, and publish the matching
1550    /// `InfraEvent::BackoffNoticeSent`. The K-then-silent transition
1551    /// surfaces `InfraEvent::SilentDropActive` as well so operators
1552    /// see the fallback engage. Returns the resulting `EngineStep`s;
1553    /// the caller appends them to the polling step list.
1554    fn maybe_emit_backoff_notice(
1555        &mut self,
1556        src_peer: crate::ids::PeerId,
1557        cause: crate::framework::BackoffCause,
1558        hint_ns: Option<u64>,
1559    ) -> Vec<EngineStep> {
1560        // Pull config-driven mark; PhiAccrual + ExplicitDrop callers
1561        // bypass the queue-depth check because they were triggered
1562        // by an external signal (φ flip / Component reject).
1563        let force = !matches!(cause, crate::framework::BackoffCause::QueueFull);
1564        if !force {
1565            // Compare the pre-drain ingress-depth snapshot to the
1566            // configured high-water fraction of capacity. The
1567            // snapshot stays valid across every
1568            // `process_ingress_event` call inside the same poll
1569            // cycle — using the current `ingress.len()` would see
1570            // post-drain zero and never trip.
1571            let len = self.phase1_pre_drain_depth;
1572            let cap = self.ingress.capacity();
1573            if !self
1574                .framework
1575                .peer_state
1576                .backpressure
1577                .is_over_high_water(len, cap)
1578            {
1579                return Vec::new();
1580            }
1581        }
1582
1583        let now_ns = self.framework.scheduler.now_ns();
1584        // `hint_ns` for QueueFull is sized by the configured
1585        // min-notice interval (the BackpressureTracker enforces the
1586        // floor). PhiAccrual callers may pass a specific mean
1587        // inter-arrival hint; ExplicitDrop callers pass the
1588        // Component-supplied `BackpressureHint`.
1589        let min_hint = hint_ns.unwrap_or(0);
1590        let decision = self
1591            .framework
1592            .peer_state
1593            .backpressure
1594            .observe_overload(src_peer, cause, min_hint, now_ns);
1595
1596        let (min_backoff_ns, cause_chosen) = match decision {
1597            crate::framework::BackpressureDecision::EmitNotice {
1598                min_backoff_ns,
1599                cause,
1600            } => (min_backoff_ns, cause),
1601            crate::framework::BackpressureDecision::Suppress
1602            | crate::framework::BackpressureDecision::SilentDrop => return Vec::new(),
1603        };
1604
1605        // Build the notice envelope + push it on the outbound queue.
1606        let payload =
1607            crate::framework::BackoffNoticePayload::new(min_backoff_ns, cause_chosen, None);
1608        let envelope =
1609            crate::framework::build_backoff_notice_envelope(self.self_peer, src_peer, payload);
1610        self.framework.outbound_queue.push(envelope);
1611
1612        // Bus event for ops dashboards + Component authors that
1613        // want to react to local overload signals.
1614        self.bus.publish(crate::bus::NodeEvent::Infra(
1615            crate::bus::InfraEvent::BackoffNoticeSent {
1616                peer: src_peer,
1617                cause: cause_chosen,
1618                min_backoff_ns,
1619            },
1620        ));
1621
1622        // If this emission flipped the peer into silent-drop mode,
1623        // surface `SilentDropActive` once so operators see the
1624        // K-then-silent transition.
1625        if self
1626            .framework
1627            .peer_state
1628            .backpressure
1629            .is_silent_drop_active(src_peer)
1630        {
1631            self.bus.publish(crate::bus::NodeEvent::Infra(
1632                crate::bus::InfraEvent::SilentDropActive { peer: src_peer },
1633            ));
1634        }
1635
1636        Vec::new()
1637    }
1638}
1639
1640/// Returns `true` when a cycle has reached its op-invocation budget.
1641/// `None` budget always returns `false` (cap disabled).
1642#[inline]
1643fn budget_hit(budget: Option<usize>, ops_invoked: usize) -> bool {
1644    matches!(budget, Some(cap) if ops_invoked >= cap)
1645}
1646
1647/// Reservation failure surfaced by `merge_src_peer_addresses` so the
1648/// caller can mint a single `WireReceiveError::AllocationFailed`
1649/// `EngineStep` carrying the bytes the boundary tried to claim.
1650/// Covers both S4 (the claimed-address dedup buffer) and S5
1651/// (`AddressBook::add_peer`'s peer-entry dedup buffer).
1652struct SrcAddressMergeAllocError {
1653    /// Approximate bytes the failing reservation requested
1654    /// (`address_count * size_of::<Address>()`). Mirrored into the
1655    /// `WireReceiveError::AllocationFailed::byte_count` field for
1656    /// telemetry.
1657    byte_count: usize,
1658    /// Why the reservation failed. `HeapExhausted` for both sites;
1659    /// the boundary has no per-item cap (cap-driven rejection lives
1660    /// on the application-ingress path, not here).
1661    reason: crate::bus::AllocFailReason,
1662}
1663
1664#[cfg(test)]
1665#[path = "poll_recv_seed_tests.rs"]
1666mod poll_recv_seed_tests;
1667
1668#[cfg(test)]
1669#[path = "poll_bus_routing_tests.rs"]
1670mod poll_bus_routing_tests;
1671
1672#[cfg(test)]
1673#[path = "poll_ingress_handler_tests.rs"]
1674mod poll_ingress_handler_tests;
1675
1676#[cfg(test)]
1677#[path = "poll_budget_tests.rs"]
1678mod poll_budget_tests;
1679
1680#[cfg(test)]
1681#[path = "poll_async_error_tests.rs"]
1682mod poll_async_error_tests;
1683
1684#[cfg(test)]
1685#[path = "poll_wire_timeout_tests.rs"]
1686mod poll_wire_timeout_tests;
1687
1688#[cfg(test)]
1689#[path = "introspection_tests.rs"]
1690mod introspection_tests;
1691
1692#[cfg(test)]
1693#[path = "peer_governor_tests.rs"]
1694mod peer_governor_tests;
1695
1696#[cfg(test)]
1697#[path = "poll_backpressure_tests.rs"]
1698mod poll_backpressure_tests;
1699
1700#[cfg(test)]
1701#[path = "poll_src_peer_addresses_tests.rs"]
1702mod poll_src_peer_addresses_tests;
1703
1704#[cfg(test)]
1705#[path = "poll_observed_address_tests.rs"]
1706mod poll_observed_address_tests;
1707
1708#[cfg(test)]
1709#[path = "poll_typed_receive_tests.rs"]
1710mod poll_typed_receive_tests;
1711
1712#[cfg(test)]
1713#[path = "poll_ingress_alloc_tests.rs"]
1714mod poll_ingress_alloc_tests;
1715
1716#[cfg(test)]
1717#[path = "poll_backend_materialize_tests.rs"]
1718mod poll_backend_materialize_tests;