Skip to main content

bb_runtime/engine/
poll.rs

1//! 8-phase poll cycle + `handle_completion` per `docs/ENGINE.md`
2//! §7 + §9.
3use crate::engine::core::Engine;
4use crate::engine::step::EngineStep;
5use crate::framework::scheduler::TimerKind;
6use crate::ids::{CommandId, NodeSiteId};
7use crate::ingress::IngressEvent;
8use crate::slot_value::SlotValue;
9use crate::syscall::values::{BytesValue, WireReqIdValue};
10
11impl Engine {
12    /// Handle a CommandId completion per ENGINE.md §9.2.
13    /// Writes the values into the suspended Op's output sites +
14    /// pushes ready downstream consumers onto the frontier.
15    pub fn handle_completion(
16        &mut self,
17        cmd_id: CommandId,
18        values: Vec<(String, Box<dyn SlotValue>)>,
19    ) -> Vec<EngineStep> {
20        let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
21            // No matching suspension - completion arrived for a
22            // CommandId the engine doesn't know. Silently drop;
23            return Vec::new();
24        };
25
26        let mut steps = Vec::new();
27
28        // Move output_sites out of pending (pending is owned via
29        // pending_async.remove above). Helpers take &[NodeSiteId]
30        // borrows; the final step consumes by value.
31        let sites: Vec<NodeSiteId> = pending.output_sites;
32        for (i, (_name, value)) in values.into_iter().enumerate() {
33            if let Some(site) = sites.get(i).copied() {
34                self.exec
35                    .slot_table
36                    .insert((site, pending.exec_id), Some(value));
37            }
38        }
39
40        // Push ready downstream consumers.
41        self.push_ready_consumers(&sites, pending.exec_id);
42
43        // Function-call splice: async completion arriving inside a
44        // body's derived ExecId forwards to the caller's slots per
45        // ENGINE.md §8.4. No-op when there's no pending call context.
46        self.forward_outputs_to_caller(&sites, pending.exec_id);
47
48        // Surface top-level function outputs (no in-function consumer)
49        // as AppEvents - same path as `Engine::write_outputs`, so
50        // async-completion writes participate in the canonical
51        // function-signature → engine I/O contract.
52        self.surface_top_level_outputs(&sites, pending.exec_id);
53
54        steps.push(EngineStep::OpCompleted {
55            op_ref: pending.op_ref,
56            exec_id: pending.exec_id,
57            sites_written: sites,
58        });
59        steps
60    }
61
62    /// Handle a transport-reported failure for a suspended
63    /// `CommandId`. The Op that was waiting on `cmd_id` fails
64    /// through the existing `OpFailed` path (bus
65    /// `InfraEvent::OpFailure` + `EngineStep::OpFailed`). Use this
66    /// when the host's transport adapter learns that the remote
67    /// side failed to produce a result - the framework no longer
68    /// silently swallows the outcome.
69    pub fn handle_completion_failed(
70        &mut self,
71        cmd_id: CommandId,
72        error: crate::bus::OpError,
73    ) -> Vec<EngineStep> {
74        let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
75            // No matching suspension - failure arrived for a
76            // CommandId the engine doesn't know. Silently drop;
77            // the host's transport reconciliation should have
78            // caught this earlier.
79            return Vec::new();
80        };
81        vec![self.fail_op(
82            pending.op_ref,
83            pending.exec_id,
84            crate::bus::OpErrorKind::RemoteFailed,
85            "completion_failed",
86            error.detail,
87        )]
88    }
89
90    /// Expire any pending async suspensions whose `deadline_ns` is
91    /// past `scheduler.now_ns()`. Each expired suspension fails via
92    /// the existing `OpFailed` surface with
93    /// `OpError("deadline exceeded")`. Returns the resulting steps.
94    /// Called from Phase 5 of the poll cycle before draining
95    /// `pending_completions`, so deadline failures land in the
96    /// same poll where they expire.
97    fn expire_deadlines(&mut self) -> Vec<EngineStep> {
98        let now_ns = self.framework.scheduler.now_ns();
99        let expired: Vec<CommandId> = self
100            .exec
101            .pending_async
102            .iter()
103            .filter_map(|(cmd, p)| match p.deadline_ns {
104                Some(d) if d <= now_ns => Some(*cmd),
105                _ => None,
106            })
107            .collect();
108        let mut steps = Vec::new();
109        for cmd in expired {
110            if let Some(p) = self.exec.pending_async.remove(&cmd) {
111                steps.push(self.fail_op(
112                    p.op_ref,
113                    p.exec_id,
114                    crate::bus::OpErrorKind::Timeout,
115                    "deadline_exceeded",
116                    "deadline exceeded".to_string(),
117                ));
118            }
119        }
120
121        // Drain stale in-flight wire requests. Each evicted entry
122        // surfaces as `EngineStep::WireTimeout` for observability;
123        // if it carried a `parked_op`, fail the originator's local
124        // continuation with "chain timeout" so it doesn't sit
125        // parked forever.
126        let drained = self.framework.request_tracker.drain_stale(now_ns);
127        for (wire_req_id, entry) in drained {
128            steps.push(EngineStep::WireTimeout {
129                wire_req_id,
130                target_site: entry.target_site,
131                started_at_ns: entry.started_at_ns,
132                parked_op: entry.parked_op,
133            });
134            if let Some(cmd) = entry.parked_op {
135                if let Some(p) = self.exec.pending_async.remove(&cmd) {
136                    steps.push(self.fail_op(
137                        p.op_ref,
138                        p.exec_id,
139                        crate::bus::OpErrorKind::Timeout,
140                        "chain_timeout",
141                        "chain timeout".to_string(),
142                    ));
143                }
144            }
145        }
146        steps
147    }
148
149    /// 8-phase poll cycle per ENGINE.md §7.
150    pub fn poll(&mut self) -> Vec<EngineStep> {
151        let _poll_span = tracing::debug_span!("engine.poll").entered();
152        // GC executions that finished in the previous cycle. The
153        // one-cycle delay lets the host read completion state via
154        // `slot_at` between polls; production consumers read via the
155        // `EngineStep` stream so the delay is invisible to them.
156        self.gc_completed_executions();
157        let mut steps = Vec::new();
158        // Per-poll counter used by `cycle_op_budget` enforcement.
159        // Increments once per `invoke_one` call across Phases 2, 6,
160        // and 7. When the budget is hit, the current drain loop
161        // breaks and `CycleBudgetExceeded` is appended once.
162        let mut ops_invoked: usize = 0;
163        let mut budget_exceeded = false;
164
165        // Host-driven bootstrap kicks land via `Node::run_bootstrap`
166        // before this poll runs (empty-slice install-order kick or
167        // per-target staging). Both paths arm `bootstrap.pending`
168        // and seed the body's OpRefs onto the frontier so the
169        // drain phases below pick them up. Install no longer
170        // auto-seeds — the host owns when bootstrap starts.
171        let bootstrap_was_pending = self.bootstrap.pending;
172        // Per-phase BootstrapComplete steps accumulate here as each
173        // queued key drains. The final `WaitingOnBootstrap` /
174        // terminal `BootstrapComplete` decision below uses
175        // `bootstrap_was_pending` + post-drain `bootstrap_pending` to
176        // detect a partial drain (queue not yet empty, async pending).
177        let mut bootstrap_phases_completed: usize = 0;
178
179        // While bootstrap is pending the ingress drain consumes only
180        // events that can advance the bootstrap call (its async
181        // completions, transport failures). Body-side events
182        // (AppEvent, EnvelopeFrom, Invoke) requeue so the host
183        // observes the same pre-bootstrap delivery order on the
184        // cycle after BootstrapComplete fires. The loop re-drains
185        // when bootstrap completes mid-pass so body events queued
186        // before bootstrap finished still process in this cycle.
187        //
188        // Snapshot the pre-drain depth so per-envelope handlers see
189        // the receiver's overload signal even after the drain runs the
190        // queue to zero.
191        self.phase1_pre_drain_depth = self.ingress.len();
192        {
193            let _phase1 = tracing::debug_span!("engine.phase1_ingress").entered();
194            loop {
195                let was_pending = self.bootstrap.pending;
196                let ingress_events = self.ingress.drain_all();
197                if ingress_events.is_empty() {
198                    break;
199                }
200                for event in ingress_events {
201                    if self.bootstrap.pending && self.is_body_phase_ingress(&event) {
202                        let _ = self.ingress.push(event);
203                        continue;
204                    }
205                    steps.extend(self.process_ingress_event(event));
206                    if self.maybe_complete_bootstrap() {
207                        bootstrap_phases_completed += 1;
208                        self.seed_bootstrap_call();
209                    }
210                }
211                if !was_pending || self.bootstrap.pending {
212                    break;
213                }
214            }
215        }
216
217        {
218            let _phase2 = tracing::debug_span!("engine.phase2_frontier_drain").entered();
219            loop {
220                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
221                    let step = self.invoke_one(op_ref, exec_id);
222                    steps.push(step);
223                    ops_invoked += 1;
224                    if budget_hit(self.cycle_op_budget, ops_invoked) {
225                        budget_exceeded = true;
226                        break;
227                    }
228                }
229                // If a queued bootstrap phase just drained, re-seed
230                // the next one and cascade in-cycle so the host sees
231                // every BootstrapComplete + the body's first ops in a
232                // single poll when budget permits.
233                if self.maybe_complete_bootstrap() {
234                    bootstrap_phases_completed += 1;
235                    if budget_exceeded {
236                        break;
237                    }
238                    if !self.seed_bootstrap_call() {
239                        break;
240                    }
241                } else {
242                    break;
243                }
244            }
245        }
246
247        // Before draining, surface any FIFO drops accumulated since
248        // the last poll as an `InfraEvent::BusOverflow`. Publishing
249        // before drain keeps the event in this cycle's routing pass.
250        let bus_dropped = self.bus.take_dropped_count();
251        if bus_dropped > 0 {
252            self.bus.publish(crate::bus::NodeEvent::Infra(
253                crate::bus::InfraEvent::BusOverflow { count: bus_dropped },
254            ));
255        }
256        // For every NodeEvent on the bus, derive its `kind` string
257        // (via NodeEvent::kind()) and look up the subscribed
258        // `NodeSiteId`s. For each site, write a `TriggerValue` at a
259        // fresh `ExecId` and push the site's downstream consumers
260        // onto the frontier. This matches the wire delivery
261        // semantics per `docs/ADDRESSING.md`.
262        let events = self.bus.drain();
263        if !events.is_empty() {
264            let mut to_seed: Vec<crate::ids::NodeSiteId> = Vec::new();
265            for event in events {
266                let kind = event.kind();
267                let Some(sites) = self.event_subscriptions.get(kind) else {
268                    continue;
269                };
270                to_seed.extend(sites.iter().copied());
271            }
272            for site in to_seed {
273                let exec_id = self.allocate_exec_id();
274                let value: Box<dyn crate::slot_value::SlotValue> =
275                    Box::new(crate::syscall::values::TriggerValue);
276                self.exec.slot_table.insert((site, exec_id), Some(value));
277                let consumers: Vec<crate::ids::OpRef> = self
278                    .graphs_iter()
279                    .filter_map(|g| g.consumers.get(&site).cloned())
280                    .flatten()
281                    .collect();
282                for op_ref in consumers {
283                    self.exec.frontier.push_back((op_ref, exec_id));
284                }
285            }
286        }
287
288        let now_ns = self.framework.scheduler.now_ns();
289        let matured = self.framework.scheduler.poll_matured(now_ns);
290        for kind in matured {
291            self.handle_matured_timer(kind);
292        }
293
294        // Engine-side deadline scan runs first so an expired
295        // suspension fails this cycle even if a (now-stale)
296        // completion is also queued.
297        {
298            let _phase5 = tracing::debug_span!("engine.phase5_completions").entered();
299            steps.extend(self.expire_deadlines());
300            let completions = std::mem::take(&mut self.exec.pending_completions);
301            for c in completions {
302                steps.extend(self.handle_completion(c.cmd_id, c.results));
303            }
304            if self.maybe_complete_bootstrap() {
305                bootstrap_phases_completed += 1;
306                self.seed_bootstrap_call();
307            }
308        }
309
310        // tracker entry and publish bus events on state changes.
311        {
312            let now_ns = self.framework.scheduler.now_ns();
313            let transitions = self.framework.rtt_tracker.scan_phi(now_ns);
314            for transition in transitions {
315                let event = match transition {
316                    crate::framework::rtt_tracker::PhiTransition::Suspect { site, phi } => {
317                        crate::bus::InfraEvent::PeerSuspect { site, phi }
318                    }
319                    crate::framework::rtt_tracker::PhiTransition::Down { site, phi } => {
320                        crate::bus::InfraEvent::PeerDown { site, phi }
321                    }
322                    crate::framework::rtt_tracker::PhiTransition::Live { site } => {
323                        crate::bus::InfraEvent::PeerLive { site }
324                    }
325                };
326                self.bus.publish(crate::bus::NodeEvent::Infra(event));
327            }
328        }
329
330        if !budget_exceeded {
331            loop {
332                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
333                    let step = self.invoke_one(op_ref, exec_id);
334                    steps.push(step);
335                    ops_invoked += 1;
336                    if budget_hit(self.cycle_op_budget, ops_invoked) {
337                        budget_exceeded = true;
338                        break;
339                    }
340                }
341                if self.maybe_complete_bootstrap() {
342                    bootstrap_phases_completed += 1;
343                    if budget_exceeded {
344                        break;
345                    }
346                    if !self.seed_bootstrap_call() {
347                        break;
348                    }
349                } else {
350                    break;
351                }
352            }
353        }
354
355        // For each phase queued by the host via
356        // `Engine::fire_lifecycle(phase)`, push every enrolled
357        // `LifecyclePhase` op onto the frontier with a fresh ExecId
358        // and emit a `LifecycleFired` step. Cascade-drain so newly
359        // pushed ops invoke in this same poll cycle.
360        let fired: Vec<String> = std::mem::take(&mut self.fired_phases);
361        for phase in &fired {
362            let op_refs: Vec<crate::ids::OpRef> =
363                self.lifecycle_table.get(phase).cloned().unwrap_or_default();
364            let pairs: Vec<(crate::ids::OpRef, crate::ids::ExecId)> = op_refs
365                .into_iter()
366                .map(|op_ref| (op_ref, self.allocate_exec_id()))
367                .collect();
368            for (op_ref, exec_id) in pairs {
369                self.exec.frontier.push_back((op_ref, exec_id));
370            }
371            steps.push(EngineStep::LifecycleFired {
372                phase: phase.clone(),
373            });
374        }
375        if !budget_exceeded {
376            loop {
377                while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
378                    let step = self.invoke_one(op_ref, exec_id);
379                    steps.push(step);
380                    ops_invoked += 1;
381                    if budget_hit(self.cycle_op_budget, ops_invoked) {
382                        budget_exceeded = true;
383                        break;
384                    }
385                }
386                if self.maybe_complete_bootstrap() {
387                    bootstrap_phases_completed += 1;
388                    if budget_exceeded {
389                        break;
390                    }
391                    if !self.seed_bootstrap_call() {
392                        break;
393                    }
394                } else {
395                    break;
396                }
397            }
398        }
399
400        let _phase8 = tracing::debug_span!("engine.phase8_outbound").entered();
401        for env in self.framework.outbound_queue.drain_all() {
402            steps.push(EngineStep::SendEnvelope(env));
403        }
404        // Surface peer-resolution failures captured by the wire
405        // syscall during this poll. Each entry becomes a dedicated
406        // EngineStep::PeerResolveFailed; the matching bus event was
407        // already published by the syscall, so subscribers got the
408        // routable mirror in real time.
409        for (peer, op_ref) in self.framework.pending_peer_resolve_failures.drain(..) {
410            steps.push(EngineStep::PeerResolveFailed {
411                peer,
412                op_ref,
413                exec_id: crate::ids::ExecId::from(0u64),
414            });
415        }
416        // Emit a single `OutboundDropped` step capturing FIFO drops
417        // that accumulated since the previous poll (e.g., when push
418        // exceeded `max_outbound_queue`).
419        let dropped = self.framework.outbound_queue.take_dropped_count();
420        if dropped > 0 {
421            steps.push(EngineStep::OutboundDropped { count: dropped });
422        }
423        // Surface a single `CycleBudgetExceeded` at the end so the
424        // host knows to re-poll. The step is appended after all
425        // observable per-op steps so the budget signal trails the
426        // in-cycle work it bounded.
427        if budget_exceeded {
428            steps.push(EngineStep::CycleBudgetExceeded { ops_invoked });
429        }
430        // Bootstrap state observable. Each queued bootstrap key that
431        // drained during this cycle emits its own `BootstrapComplete`
432        // — multi-target installs surface one signal per target in
433        // install order. `WaitingOnBootstrap` lands when the
434        // *currently* in-flight bootstrap op suspended on async
435        // completion and the host must drive the resumption before
436        // re-polling. Body-phase ops are gated from firing while
437        // `bootstrap_pending` is set.
438        if bootstrap_was_pending {
439            for _ in 0..bootstrap_phases_completed {
440                steps.push(EngineStep::BootstrapComplete);
441            }
442            if self.bootstrap.pending {
443                steps.push(EngineStep::WaitingOnBootstrap);
444            }
445        }
446        // Drain AppEmit / AppNotify syscall outputs into
447        // EngineStep::AppEvent. `Emit` carries serialized payload
448        // bytes; `Notify` is a marker-only event (empty `value_bytes`).
449        for ev in std::mem::take(&mut self.framework.pending_app_events) {
450            let (module_name, topic, value_bytes) = match ev {
451                crate::bus::AppEvent::Emit { name, value_bytes } => {
452                    (String::new(), name, value_bytes)
453                }
454                crate::bus::AppEvent::Notify { name } => (String::new(), name, Vec::new()),
455            };
456            steps.push(EngineStep::AppEvent {
457                module_name,
458                topic,
459                value_bytes,
460            });
461        }
462
463        steps
464    }
465
466    /// Whether an ingress event would seed body-phase work. The
467    /// bootstrap gate requeues these while the bootstrap call is
468    /// outstanding so app-event delivery + envelope routing
469    /// observe the post-bootstrap engine state. Bootstrap-resuming
470    /// completions, transport failures, and host-injected timer
471    /// matures bypass the gate so the bootstrap call can progress.
472    fn is_body_phase_ingress(&self, event: &IngressEvent) -> bool {
473        matches!(
474            event,
475            IngressEvent::AppEvent { .. }
476                | IngressEvent::EnvelopeFrom { .. }
477                | IngressEvent::Invoke { .. }
478        )
479    }
480
481    /// Ingress-event router. Dispatches each event variant to its
482    /// handler: envelopes route to `deliver_inbound_internal`,
483    /// completions to `handle_completion`, app events to the bus,
484    /// matured timers to `handle_matured_timer`, invoke events to
485    /// the per-module entry point.
486    fn process_ingress_event(&mut self, event: IngressEvent) -> Vec<EngineStep> {
487        match event {
488            IngressEvent::Completion { cmd_id, results } => {
489                // The host's transport pre-decodes opaque payloads
490                // and hands them as `Vec<Vec<u8>>`. The engine wraps
491                // each entry as a `BytesValue` and forwards to
492                // `handle_completion`, which writes the slots and
493                // pushes downstream consumers.
494                let typed_results: Vec<(String, Box<dyn crate::slot_value::SlotValue>)> = results
495                    .into_iter()
496                    .enumerate()
497                    .map(|(i, bytes)| {
498                        let value = crate::syscall::values::BytesValue(bytes);
499                        (
500                            format!("out_{i}"),
501                            Box::new(value) as Box<dyn crate::slot_value::SlotValue>,
502                        )
503                    })
504                    .collect();
505                self.handle_completion(cmd_id, typed_results)
506            }
507            IngressEvent::EnvelopeFrom {
508                src_peer,
509                envelope,
510                src_observed_address,
511            } => {
512                // Backpressure pre-flight: silent-drop senders bypass
513                // dispatch; others route normally and then check the
514                // post-pop ingress depth against the high-water mark
515                // to emit a single BackoffNotice.
516                if self
517                    .framework
518                    .peer_state
519                    .backpressure
520                    .is_silent_drop_active(src_peer)
521                {
522                    return Vec::new();
523                }
524                // Order: claimed (envelope) first so the entry
525                // exists for the observed-address registration step.
526                // Observed wins for NAT-translated cases because it
527                // appends a fresh address the claimed snapshot
528                // cannot know.
529                //
530                // The address-book hint is best-effort under allocator
531                // pressure: if the dedup buffer cannot be reserved we
532                // drop the hint, emit `WireReceiveError::AllocationFailed`,
533                // and continue routing — fills do not depend on the
534                // address book.
535                let mut steps = Vec::new();
536                if let Err(alloc) =
537                    self.merge_src_peer_addresses(src_peer, &envelope.src_peer_addresses)
538                {
539                    steps.push(self.emit_wire_receive_error(
540                        Some(src_peer),
541                        0,
542                        0,
543                        alloc.byte_count,
544                        crate::bus::WireReceiveErrorKind::AllocationFailed {
545                            byte_count: alloc.byte_count,
546                            reason: alloc.reason,
547                        },
548                    ));
549                }
550                if let Some(observed) = src_observed_address {
551                    self.merge_src_observed_address(src_peer, observed);
552                }
553                steps.extend(self.route_envelope(envelope, Some(src_peer)));
554                let backoff_steps = self.maybe_emit_backoff_notice(
555                    src_peer,
556                    crate::framework::BackoffCause::QueueFull,
557                    None,
558                );
559                steps.extend(backoff_steps);
560                steps
561            }
562            IngressEvent::AppEvent {
563                module_name,
564                input_name,
565                value_bytes,
566            } => self.deliver_app_event(module_name, input_name, value_bytes),
567            IngressEvent::Invoke {
568                module_name,
569                inputs,
570                exec_id,
571            } => self.deliver_invoke(module_name, inputs, exec_id),
572            IngressEvent::TimerMatured { at_ns } => {
573                self.framework.scheduler.set_now(at_ns);
574                let matured = self.framework.scheduler.poll_matured(at_ns);
575                let mut out = Vec::new();
576                for kind in matured {
577                    out.extend(self.handle_matured_timer(kind));
578                }
579                out
580            }
581            IngressEvent::CompletionFailed { cmd_id, detail } => {
582                // Async-completion FAILURE: route to the typed
583                // OpFailed path via handle_completion_failed so the
584                // parked op fails as itself rather than as a
585                // success-bytes payload.
586                self.handle_completion_failed(
587                    cmd_id,
588                    crate::bus::OpError {
589                        detail,
590                        ..Default::default()
591                    },
592                )
593            }
594            IngressEvent::SendFailed {
595                wire_req_id,
596                peer: _peer,
597                reason: _reason,
598            } => {
599                // Transport-side delivery failure. Consumes the
600                // in-flight registration so the request tracker doesn't
601                // leak the entry; the parked originator op's failure is
602                // surfaced by the wire-timeout drain (`drain_stale`)
603                // rather than waiting for the TTL to elapse.
604                let now_ns = self.framework.scheduler.now_ns();
605                let _ = self
606                    .framework
607                    .request_tracker
608                    .observe_response(wire_req_id, now_ns);
609                Vec::new()
610            }
611            IngressEvent::AppIngressError {
612                source,
613                byte_count,
614                kind,
615            } => {
616                // Cross-thread bridge for `CompletionSink::complete`
617                // exceeding the per-completion result cap. Re-publish
618                // on the bus so subscribers see the rejection
619                // alongside the synchronous emissions from
620                // `Node::deliver_event` / `Node::invoke`.
621                self.bus.publish(crate::bus::NodeEvent::Infra(
622                    crate::bus::InfraEvent::AppIngressError {
623                        source,
624                        byte_count,
625                        kind,
626                    },
627                ));
628                Vec::new()
629            }
630        }
631    }
632
633    /// Deliver an inbound `AppEvent`: look up the addressed graph,
634    /// resolve the `input_name` to a `NodeSiteId`, wrap the bytes as
635    /// a `BytesValue`, seed the slot at a fresh `ExecId`, and push
636    /// ready downstream consumers onto the frontier. Surfaces an
637    /// observable `EngineStep::AppEvent` so the host can confirm
638    /// delivery even if no consumer exists yet.
639    fn deliver_app_event(
640        &mut self,
641        module_name: String,
642        input_name: String,
643        value_bytes: Vec<u8>,
644    ) -> Vec<EngineStep> {
645        let step = EngineStep::AppEvent {
646            module_name: module_name.clone(),
647            topic: input_name.clone(),
648            value_bytes: value_bytes.clone(),
649        };
650        let Some(graph) = self.graph(&module_name) else {
651            return vec![step];
652        };
653        let Some(&site) = graph.site_names.get(&input_name) else {
654            return vec![step];
655        };
656        let exec_id = self.allocate_exec_id();
657        let value = crate::syscall::values::BytesValue(value_bytes);
658        self.exec
659            .slot_table
660            .insert((site, exec_id), Some(Box::new(value)));
661        self.push_ready_consumers(&[site], exec_id);
662        vec![step]
663    }
664
665    /// Deliver an inbound `Invoke`: seed every `(input_name,
666    /// value_bytes)` pair into the addressed graph's matching site
667    /// at the supplied `exec_id`, then push the ready consumers
668    /// onto the frontier. Unknown modules / unknown input names are
669    /// silent no-ops (the host can detect via subsequent polls
670    /// producing no steps).
671    fn deliver_invoke(
672        &mut self,
673        module_name: String,
674        inputs: Vec<(String, Vec<u8>)>,
675        exec_id: crate::ids::ExecId,
676    ) -> Vec<EngineStep> {
677        let Some(graph) = self.graph(&module_name) else {
678            return Vec::new();
679        };
680        let mut seeded_sites: Vec<crate::ids::NodeSiteId> = Vec::new();
681        let pairs: Vec<(crate::ids::NodeSiteId, Vec<u8>)> = inputs
682            .into_iter()
683            .filter_map(|(name, bytes)| graph.site_names.get(&name).map(|&site| (site, bytes)))
684            .collect();
685        for (site, bytes) in pairs {
686            let value = crate::syscall::values::BytesValue(bytes);
687            self.exec
688                .slot_table
689                .insert((site, exec_id), Some(Box::new(value)));
690            seeded_sites.push(site);
691        }
692        if !seeded_sites.is_empty() {
693            self.push_ready_consumers(&seeded_sites, exec_id);
694        }
695        Vec::new()
696    }
697
698    /// Route a single inbound envelope. Iterates each `SlotFill` and
699    /// dispatches it via its multiaddr `dest_suffix` per
700    /// `docs/ADDRESSING.md`. The receiver doesn't consult any
701    /// subscription table or routing map - the address suffix is the
702    /// routing key. Two suffix shapes are supported:
703    ///   - `/site/<NodeSiteId>` → data-plane slot fill.
704    ///   - `/component/<cref>/op/<name>` → control-plane component
705    ///     dispatch.
706    fn route_envelope(
707        &mut self,
708        env: crate::envelope::WireEnvelope,
709        src_peer: Option<crate::ids::PeerId>,
710    ) -> Vec<EngineStep> {
711        // Sender-side back-pressure ingest. Inbound envelopes whose
712        // first fill carries the reserved
713        // `BackoffNoticePayload` type-hash are framework-internal -
714        // intercept them here, decode the payload, advise the
715        // sender-side `BackoffTable`, and short-circuit the normal
716        // data-plane / control-plane delivery so user Components
717        // never observe a notice envelope.
718        if env
719            .fills
720            .first()
721            .is_some_and(|f| f.type_hash == crate::framework::backoff_notice_type_hash())
722        {
723            return self.ingest_backoff_notice(env, src_peer);
724        }
725
726        let correlation = env.correlation.as_ref().map(|c| c.wire_req_id).unwrap_or(0);
727
728        // hook. If the inbound envelope's wire_req_id matches an
729        // in-flight outbound round trip we registered at dispatch,
730        // pop the sample + feed it into the RTT tracker so the
731        // hierarchical-fallback EMAs converge on real per-edge,
732        // per-site, per-chain, and global RTT distributions.
733        let mut response_from_site: Option<crate::ids::NodeSiteId> = None;
734        if correlation != 0 {
735            let now_ns = self.framework.scheduler.now_ns();
736            if let Some(sample) = self
737                .framework
738                .request_tracker
739                .observe_response(correlation, now_ns)
740            {
741                self.framework.rtt_tracker.observe_round_trip(
742                    sample.target_site,
743                    sample.chain,
744                    sample.elapsed_ns,
745                    now_ns,
746                );
747                response_from_site = Some(sample.target_site);
748            }
749        }
750
751        // piggyback. The sender attached EdgeRttReport entries
752        // describing its observed outgoing edges in the chain. We
753        // record each report against the entry for the sending
754        // site so multi-hop chain budgets can compose from this
755        // direct neighbor's table.
756        if let Some(from_site) = response_from_site {
757            for report in &env.edge_rtt_reports {
758                self.framework.rtt_tracker.ingest_reported_outgoing(
759                    from_site,
760                    crate::ids::NodeSiteId::from(report.next_hop_site_id),
761                    report.chain_id,
762                    report.srtt_ns,
763                    report.rttvar_ns,
764                    report.sample_count,
765                );
766            }
767        }
768
769        // Capture the inbound envelope's deadline budget + arrival
770        // timestamp so consumer ops (especially `wire.Send` while
771        // forwarding) can propagate them per Dapper.
772        let inbound_remaining_deadline_ns = if env.remaining_deadline_ns > 0 {
773            Some(env.remaining_deadline_ns)
774        } else {
775            None
776        };
777        let arrival_ns = self.framework.scheduler.now_ns();
778
779        let mut steps = Vec::new();
780        for (fill_index, fill) in env.fills.into_iter().enumerate() {
781            steps.extend(self.deliver_fill(
782                fill,
783                fill_index as u32,
784                correlation,
785                src_peer,
786                arrival_ns,
787                inbound_remaining_deadline_ns,
788            ));
789        }
790        steps
791    }
792
793    /// Dispatch one `SlotFill` per `docs/ADDRESSING.md`. Parses
794    /// `fill.dest_suffix` as a multiaddr and routes by the trailing
795    /// segment shape. `fill_index` is the fill's 0-based position
796    /// within the inbound envelope; surfaces on per-fill failure
797    /// events so subscribers can identify the failing fill when
798    /// the envelope partial-delivers.
799    fn deliver_fill(
800        &mut self,
801        fill: crate::envelope::SlotFill,
802        fill_index: u32,
803        wire_req_id: u64,
804        src_peer: Option<crate::ids::PeerId>,
805        arrival_ns: u64,
806        inbound_remaining_deadline_ns: Option<u64>,
807    ) -> Vec<EngineStep> {
808        let addr = match crate::framework::Address::from_bytes(&fill.dest_suffix) {
809            Ok(a) => a,
810            Err(e) => {
811                return vec![self.emit_wire_decode_failure(
812                    0,
813                    fill.payload.len(),
814                    format!("dest_suffix parse: {e}"),
815                )];
816            }
817        };
818
819        // Data-plane suffix: /site/<NodeSiteId>. The Site segment
820        // uniquely identifies the slot (NodeSiteIds are globally
821        // unique within a Node).
822        if let Some(site_id) = addr.site_id() {
823            return self.deliver_data_plane_fill(
824                site_id,
825                fill,
826                fill_index,
827                src_peer,
828                wire_req_id,
829                arrival_ns,
830                inbound_remaining_deadline_ns,
831            );
832        }
833
834        // Control-plane suffix: /component/<cref>/op/<name>.
835        if let (Some(cref), Some(op_name)) = (addr.component_ref(), addr.op_name()) {
836            let op_name = op_name.to_string();
837            return self.deliver_control_plane_fill(cref, op_name, fill, wire_req_id);
838        }
839
840        vec![self.emit_wire_decode_failure(
841            0,
842            fill.payload.len(),
843            "address shape neither data-plane nor control-plane".to_string(),
844        )]
845    }
846
847    /// Publish a `WireDecodeFailure` onto the bus and return the
848    /// matching `EngineStep`. The bus event lets in-process
849    /// telemetry subscribers react; the EngineStep surfaces the
850    /// same context to the host poll() caller.
851    fn emit_wire_decode_failure(
852        &mut self,
853        hash: u64,
854        payload_size: usize,
855        detail: String,
856    ) -> EngineStep {
857        self.bus.publish(crate::bus::NodeEvent::Infra(
858            crate::bus::InfraEvent::WireDecodeFailure {
859                hash,
860                payload_size,
861                detail: detail.clone(),
862            },
863        ));
864        EngineStep::WireDecodeFailed {
865            hash,
866            payload_size,
867            detail,
868        }
869    }
870
871    /// Data-plane delivery: decode the fill payload into a typed
872    /// `SlotValue` via the wire decoder registry, write it into the
873    /// addressed slot at a fresh `ExecId`, and push the slot's
874    /// downstream consumers onto the frontier. Walks each installed
875    /// graph's `consumers` map for the matching `NodeSiteId`.
876    ///
877    /// Per-fill failures (unknown type-hash, type mismatch against
878    /// the slot's declared wire type, decoder error) surface as a
879    /// `WireReceiveError` InfraEvent + matching `WireReceiveFailed`
880    /// EngineStep. The failing fill drops; sibling fills in the
881    /// same envelope still deliver (the caller continues iterating).
882    #[allow(clippy::too_many_arguments)]
883    fn deliver_data_plane_fill(
884        &mut self,
885        site_id: crate::ids::NodeSiteId,
886        mut fill: crate::envelope::SlotFill,
887        fill_index: u32,
888        src_peer: Option<crate::ids::PeerId>,
889        wire_req_id: u64,
890        arrival_ns: u64,
891        inbound_remaining_deadline_ns: Option<u64>,
892    ) -> Vec<EngineStep> {
893        // Resolve the consumer ops from each installed graph; a
894        // NodeSiteId belongs to at most one graph because IDs are
895        // globally unique, but we tolerate empty lookups.
896        let consumers: Vec<crate::ids::OpRef> = self
897            .graphs_iter()
898            .filter_map(|g| g.consumers.get(&site_id).cloned())
899            .flatten()
900            .collect();
901
902        // Resolve the typed `SlotValue` BEFORE allocating an
903        // ExecId or stamping inbound context: failure modes return
904        // a WireReceiveError step and the envelope's other fills
905        // continue to deliver without polluting the slot table.
906        // Trigger fills bypass the decoder lookup entirely.
907        let value: Box<dyn crate::slot_value::SlotValue> = if fill.trigger_only {
908            Box::new(crate::syscall::values::TriggerValue)
909        } else {
910            match self.decode_typed_fill(&mut fill, fill_index, site_id, src_peer) {
911                Ok(v) => v,
912                Err(step) => return vec![step],
913            }
914        };
915
916        let exec_id = self.allocate_exec_id();
917        // Stamp the inbound envelope context for this ExecId.
918        // Components access this through `RuntimeResourceRef` (RX
919        // gates filter on `src_peer`; `wire.Send` forwarding inside a
920        // chain reuses `wire_req_id` + propagates `remaining_deadline_ns`
921        // minus elapsed local time).
922        self.framework.inbound_contexts.insert(
923            exec_id,
924            crate::framework::InboundContext {
925                src_peer,
926                wire_req_id: if wire_req_id != 0 {
927                    Some(wire_req_id)
928                } else {
929                    None
930                },
931                arrival_ns: Some(arrival_ns),
932                remaining_deadline_ns: inbound_remaining_deadline_ns,
933            },
934        );
935        // `slot_write` routes through the engine's budget-release
936        // bookkeeping so a wire-receive overwriting a prior carrier
937        // releases the prior `charged_bytes()` against
938        // `ingress_bytes_in_flight`. Fresh-slot writes (the common
939        // case here, since `exec_id` is freshly allocated) hit the
940        // no-prior branch and incur the same cost as the raw
941        // `slot_table.insert`.
942        self.slot_write(site_id, exec_id, value);
943
944        // If site_id is a wire.Recv's payload site, also populate the
945        // paired sender site with PeerIdValue(src_peer) for the same
946        // ExecId. Downstream user ops read this as a graph value to
947        // identify provenance; reply-to is `g.net_out(name, sender, reply)`.
948        let sender_site: Option<crate::ids::NodeSiteId> = self
949            .graphs_iter()
950            .find_map(|g| g.recv_sender_sites.get(&site_id).copied());
951        if let (Some(sender_site), Some(peer)) = (sender_site, src_peer) {
952            let sender_value: Box<dyn crate::slot_value::SlotValue> =
953                Box::new(crate::syscall::values::PeerIdValue(peer));
954            self.exec
955                .slot_table
956                .insert((sender_site, exec_id), Some(sender_value));
957        }
958
959        for op_ref in consumers {
960            self.exec.frontier.push_back((op_ref, exec_id));
961        }
962        Vec::new()
963    }
964
965    /// Resolve a non-trigger data-plane fill into its typed
966    /// `SlotValue` carrier. The routing tree branches on the
967    /// destination slot's binding:
968    ///
969    /// - **Backend-bound slot** — the engine takes ownership of
970    ///   `fill.payload` via `std::mem::take` (zero-copy ownership
971    ///   transfer; `fill.payload` is already framework-owned from
972    ///   envelope decode) and hands it to the bound backend's
973    ///   `materialize_from_wire`. The typed tensor lands inside a
974    ///   `BackendTensorCarrier` whose `charged_bytes` + `backend_ref`
975    ///   are stamped to the engine-side accounting before the carrier
976    ///   is returned for slot-table install.
977    /// - **Framework-carrier slot** — the wire decoder registry
978    ///   resolves the `type_hash` to a bincode decoder; the decoded
979    ///   `SlotValue` rides on as-is.
980    ///
981    /// Failure modes surface as typed `WireReceiveError` InfraEvents
982    /// + matching `WireReceiveFailed` EngineSteps:
983    ///
984    /// - **TypeMismatch** — destination slot declares an expected
985    ///   wire-type hash via `GraphSlot::recv_wire_type_hash` and
986    ///   the fill's `type_hash` does not match. Checked first so a
987    ///   mis-typed payload never reaches the decoder.
988    /// - **UnknownTypeHash** — framework-carrier path only; no
989    ///   decoder is registered for the stamped hash.
990    /// - **DecodeFailed** — registered decoder ran and rejected
991    ///   the bytes (framework-carrier path).
992    /// - **BudgetExceeded** — admitting the bytes would push the
993    ///   engine over `NodeConfig::ingress_byte_budget`.
994    /// - **BackendMaterializeFailed** — the bound backend's typed
995    ///   `materialize_from_wire` returned `Err` (backend path).
996    ///
997    /// The `Err` carries the typed `EngineStep` the caller will
998    /// surface to `Engine::poll`'s return value. `EngineStep` is
999    /// load-bearing for the host's failure visibility surface, so
1000    /// boxing it here would force every caller through an
1001    /// indirection layer that adds no clarity for a single
1002    /// internal call site.
1003    ///
1004    /// **No per-fill scratch buffer.** Backend-mediated tensor fills
1005    /// move `fill.payload` into `materialize_from_wire` via
1006    /// `std::mem::take`; the empty `Vec<u8>` left behind drops with
1007    /// the `SlotFill`. The only memcpy on the tensor path is the
1008    /// one the backend chooses to do — or skips via zero-copy
1009    /// adoption (`ArrayD::from_shape_vec` when alignment permits).
1010    #[allow(clippy::result_large_err)]
1011    fn decode_typed_fill(
1012        &mut self,
1013        fill: &mut crate::envelope::SlotFill,
1014        fill_index: u32,
1015        site_id: crate::ids::NodeSiteId,
1016        src_peer: Option<crate::ids::PeerId>,
1017    ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1018        let expected_hash: Option<u64> = self
1019            .graphs_iter()
1020            .find_map(|g| g.recv_wire_type_hash.get(&site_id).copied());
1021        if let Some(expected) = expected_hash {
1022            if expected != fill.type_hash {
1023                return Err(self.emit_wire_receive_error(
1024                    src_peer,
1025                    fill_index,
1026                    fill.type_hash,
1027                    fill.payload.len(),
1028                    crate::bus::WireReceiveErrorKind::TypeMismatch {
1029                        expected_hash: expected,
1030                    },
1031                ));
1032            }
1033        }
1034
1035        // Resolve the destination slot's binding so the backend-bound
1036        // branch can fire before the framework-carrier registry
1037        // lookup. A site without a recv_site → slot_id mapping (no
1038        // role consumes the Recv payload) takes the framework path.
1039        let slot_id = self
1040            .graphs_iter()
1041            .find_map(|g| g.recv_site_to_slot_id.get(&site_id).copied());
1042        let role_ref = slot_id.and_then(|id| self.role_ref_for_slot_id(id));
1043
1044        // Budget guard (uniform across branches): pre-charge the
1045        // fill's payload length against `NodeConfig::ingress_byte_budget`
1046        // before invoking either decoder. Successful admission leaves
1047        // the charge in the counter; failure paths release before
1048        // returning.
1049        let byte_count = fill.payload.len();
1050        if let Err(reason) = self.try_charge(byte_count) {
1051            return Err(self.emit_wire_receive_error(
1052                src_peer,
1053                fill_index,
1054                fill.type_hash,
1055                byte_count,
1056                crate::bus::WireReceiveErrorKind::BudgetExceeded {
1057                    byte_count: reason.byte_count,
1058                    budget_remaining: reason.budget_remaining,
1059                },
1060            ));
1061        }
1062
1063        if let Some((crate::registry::ComponentRole::Backend, backend_ref)) = role_ref {
1064            return self.materialize_via_backend(
1065                fill,
1066                fill_index,
1067                src_peer,
1068                byte_count,
1069                backend_ref,
1070            );
1071        }
1072
1073        let Some(decoder) = crate::slot_value::wire_decoder_registry()
1074            .get(&fill.type_hash)
1075            .copied()
1076        else {
1077            self.release(byte_count);
1078            return Err(self.emit_wire_receive_error(
1079                src_peer,
1080                fill_index,
1081                fill.type_hash,
1082                fill.payload.len(),
1083                crate::bus::WireReceiveErrorKind::UnknownTypeHash,
1084            ));
1085        };
1086        decoder(&fill.payload).map_err(|e| {
1087            self.release(byte_count);
1088            self.emit_wire_receive_error(
1089                src_peer,
1090                fill_index,
1091                fill.type_hash,
1092                byte_count,
1093                crate::bus::WireReceiveErrorKind::DecodeFailed {
1094                    error_summary: e.to_string(),
1095                },
1096            )
1097        })
1098    }
1099
1100    /// Backend-bound branch of [`Self::decode_typed_fill`]. Hands the
1101    /// inbound bytes to the bound backend via the role-dispatch
1102    /// registry; wraps the typed `Self::Tensor` in a
1103    /// `BackendTensorCarrier` whose engine-side accounting fields
1104    /// (`charged_bytes`, `backend_ref`) are stamped before the
1105    /// carrier is returned. On error releases the byte charge and
1106    /// emits `BackendMaterializeFailed`.
1107    #[allow(clippy::result_large_err)]
1108    fn materialize_via_backend(
1109        &mut self,
1110        fill: &mut crate::envelope::SlotFill,
1111        fill_index: u32,
1112        src_peer: Option<crate::ids::PeerId>,
1113        byte_count: usize,
1114        backend_ref: crate::ids::ComponentRef,
1115    ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1116        // `mem::take` transfers ownership of `fill.payload` to the
1117        // backend at zero cost: the wire bytes are already
1118        // framework-owned (prost allocated them during envelope
1119        // decode), and the empty `Vec<u8>` left in `fill.payload`
1120        // drops with the `SlotFill`. No scratch buffer, no memcpy on
1121        // the framework side.
1122        let bytes = std::mem::take(&mut fill.payload);
1123        let type_hash = fill.type_hash;
1124
1125        // Take the backend component out of the Vec so dispatch can
1126        // borrow it without holding a long lease on `engine.components`.
1127        // Restore on the way out — even on error paths.
1128        let Some(mut taken) = self.take_component(backend_ref) else {
1129            self.release(byte_count);
1130            return Err(self.emit_wire_receive_error(
1131                src_peer,
1132                fill_index,
1133                type_hash,
1134                byte_count,
1135                crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1136                    backend_ref,
1137                    backend_error_summary: "backend component slot empty".to_string(),
1138                },
1139            ));
1140        };
1141
1142        // Reach the backend through the per-T dispatcher in
1143        // `role_dispatchers`. The dispatcher closes over the typed
1144        // `Self::Tensor` so the boxed `SlotValue` returned here is
1145        // already a `BackendTensorCarrier` (the derive bridge does
1146        // the wrap).
1147        let any: &mut dyn std::any::Any = taken.as_mut();
1148        let tid = (*any).type_id();
1149        let dispatcher = self.role_dispatchers.get(&tid).map(|d| d.materialize);
1150
1151        let result = if let Some(materialize) = dispatcher {
1152            (materialize)(any, type_hash, bytes)
1153        } else {
1154            Err(crate::slot_value::BackendMaterializeError {
1155                summary: "no BackendRuntime dispatcher registered".to_string(),
1156            })
1157        };
1158
1159        self.restore_component(backend_ref, taken);
1160
1161        match result {
1162            Ok(boxed) => {
1163                // Downcast to `BackendTensorCarrier` and stamp the
1164                // engine-side accounting fields the derive bridge left
1165                // as placeholders. The bridge constructs the carrier
1166                // with the typed clone / encode fn pointers; the
1167                // engine owns the budget counter and the backend
1168                // identity, so it fills them in here. The downcast
1169                // is infallible by the bridge's construction; any
1170                // hand-rolled `BackendRuntime::materialize_from_wire`
1171                // that returns a non-carrier `SlotValue` flows
1172                // through unchanged (no accounting stamp), which is
1173                // the right behaviour because non-carrier returns
1174                // never charge against the backend-tensor pool.
1175                let any_box = boxed.into_any_boxed();
1176                let final_boxed: Box<dyn crate::slot_value::SlotValue> =
1177                    match any_box.downcast::<crate::slot_value::BackendTensorCarrier>() {
1178                        Ok(mut carrier) => {
1179                            carrier.charged_bytes = byte_count;
1180                            carrier.backend_ref = backend_ref;
1181                            carrier
1182                        }
1183                        Err(other) => {
1184                            // The dispatcher returned a non-carrier
1185                            // `SlotValue`; route it through unchanged.
1186                            // `Box<dyn Any + Send + Sync>` downcasts
1187                            // back to `Box<dyn SlotValue>` only via a
1188                            // typed re-box, which we don't do — log
1189                            // and release the budget charge instead.
1190                            let _ = other;
1191                            self.release(byte_count);
1192                            return Err(self.emit_wire_receive_error(
1193                                src_peer,
1194                                fill_index,
1195                                type_hash,
1196                                byte_count,
1197                                crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1198                                    backend_ref,
1199                                    backend_error_summary:
1200                                        "backend bridge returned non-carrier SlotValue".to_string(),
1201                                },
1202                            ));
1203                        }
1204                    };
1205                Ok(final_boxed)
1206            }
1207            Err(e) => {
1208                self.release(byte_count);
1209                Err(self.emit_wire_receive_error(
1210                    src_peer,
1211                    fill_index,
1212                    type_hash,
1213                    byte_count,
1214                    crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1215                        backend_ref,
1216                        backend_error_summary: e.summary,
1217                    },
1218                ))
1219            }
1220        }
1221    }
1222
1223    /// Publish a `WireReceiveError` on the bus and return the
1224    /// matching `EngineStep`. Mirrors `emit_wire_decode_failure`
1225    /// for the per-fill typed-decode failure surface.
1226    fn emit_wire_receive_error(
1227        &mut self,
1228        src_peer: Option<crate::ids::PeerId>,
1229        fill_index: u32,
1230        actual_hash: u64,
1231        payload_size: usize,
1232        kind: crate::bus::WireReceiveErrorKind,
1233    ) -> EngineStep {
1234        self.bus.publish(crate::bus::NodeEvent::Infra(
1235            crate::bus::InfraEvent::WireReceiveError {
1236                src_peer,
1237                fill_index,
1238                actual_hash,
1239                payload_size,
1240                kind: kind.clone(),
1241            },
1242        ));
1243        EngineStep::WireReceiveFailed {
1244            src_peer,
1245            fill_index,
1246            actual_hash,
1247            payload_size,
1248            kind,
1249        }
1250    }
1251
1252    /// Control-plane delivery: invoke
1253    /// `component[cref].dispatch_atomic(op_name, [(payload,
1254    /// correlation, ...)], ctx)`. The component decodes the payload
1255    /// bytes via its own protocol logic.
1256    fn deliver_control_plane_fill(
1257        &mut self,
1258        component_ref: crate::ids::ComponentRef,
1259        op_name: String,
1260        fill: crate::envelope::SlotFill,
1261        wire_req_id: u64,
1262    ) -> Vec<EngineStep> {
1263        let payload = BytesValue(fill.payload);
1264        let correlation = WireReqIdValue(wire_req_id);
1265
1266        let inputs_storage: Vec<(String, Box<dyn SlotValue>)> = vec![
1267            ("payload".to_string(), Box::new(payload)),
1268            ("correlation".to_string(), Box::new(correlation)),
1269        ];
1270
1271        let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> = inputs_storage
1272            .iter()
1273            .map(|(n, h)| (n.as_str(), h.as_ref()))
1274            .collect();
1275
1276        // D2 take-and-restore so we can split-borrow the rest of
1277        // `self.framework` / `self.bus` etc. while the dispatching
1278        // component is held exclusively.
1279        let Some(mut taken) = self.take_component(component_ref) else {
1280            return Vec::new();
1281        };
1282
1283        let mut ctx = crate::runtime::RuntimeResourceRef {
1284            peers: crate::runtime::PeerCtx {
1285                gate: &mut self.framework.peer_state.gate,
1286                backoff: &mut self.framework.peer_state.backoff,
1287                governor: &mut self.framework.peer_state.governor,
1288                addresses: &mut self.framework.address_book,
1289                backpressure: &mut self.framework.peer_state.backpressure,
1290            },
1291            net: crate::runtime::NetCtx {
1292                outbound: &mut self.framework.outbound_queue,
1293                rtt: &mut self.framework.rtt_tracker,
1294                requests: &mut self.framework.request_tracker,
1295                dedup: &mut self.framework.inbound_dedup,
1296                pending_peer_resolve_failures: &mut self.framework.pending_peer_resolve_failures,
1297            },
1298            time: crate::runtime::TimeCtx {
1299                scheduler: &mut self.framework.scheduler,
1300            },
1301            syscall: crate::runtime::SyscallCtx {
1302                serialize_queue: &mut self.framework.serialize_queue,
1303                hold_table: &mut self.framework.hold_table,
1304                record_buffer: &mut self.framework.record_buffer,
1305                event_source: &mut self.framework.event_source,
1306                counters: &mut self.framework.counters,
1307                any_fired_groups: &mut self.framework.any_fired_groups,
1308                deadline_match_fired: &mut self.framework.deadline_match_fired,
1309                rng: &mut *self.framework.rng,
1310                pending_app_events: &mut self.framework.pending_app_events,
1311            },
1312            bus: &mut self.bus,
1313            ingress: std::sync::Arc::clone(&self.ingress),
1314            components: crate::runtime::ComponentsView {
1315                instances: Some(&self.components),
1316                slots: Some(&self.slots),
1317            },
1318            current: crate::runtime::CurrentCallCtx {
1319                op_ref: crate::ids::OpRef::from(0u64),
1320                exec_id: crate::ids::ExecId::from(0u64),
1321                self_peer: self.self_peer,
1322                node_attributes: &[],
1323                node_metadata: &[],
1324                inbound: crate::runtime::InboundCtx {
1325                    src_peer: None,
1326                    wire_req_id: None,
1327                    arrival_ns: None,
1328                    remaining_deadline_ns: None,
1329                },
1330                pending_completions: Vec::new(),
1331                next_command_id: &mut self.exec.ids.next_command_id,
1332            },
1333        };
1334
1335        let _ = crate::engine::invoke::call_protocol_dispatch_atomic(
1336            taken.as_mut(),
1337            &op_name,
1338            &inputs_for_dispatch,
1339            &mut ctx,
1340            &self.role_dispatchers,
1341        );
1342        let captured = std::mem::take(&mut ctx.current.pending_completions);
1343        drop(ctx);
1344        self.exec.pending_completions.extend(captured);
1345
1346        self.restore_component(component_ref, taken);
1347
1348        Vec::new()
1349    }
1350
1351    /// Route a matured timer to its consumer.
1352    /// - `Sleep`/`Completion` fulfil a pending `CommandId`.
1353    /// - `Interval` re-pushes its owning Op onto the frontier at a
1354    ///   fresh `ExecId`; the Op's `invoke` re-schedules the next
1355    ///   firing and emits the periodic `TriggerValue` downstream.
1356    /// - `After` fulfils the parked `CommandId` (which the Op
1357    ///   suspended on) with a single `TriggerValue`.
1358    fn handle_matured_timer(
1359        &mut self,
1360        kind: crate::framework::scheduler::TimerKind,
1361    ) -> Vec<EngineStep> {
1362        match kind {
1363            TimerKind::Sleep(cmd_id) | TimerKind::Completion(cmd_id) => {
1364                self.handle_completion(cmd_id, Vec::new())
1365            }
1366            TimerKind::Interval { key, .. } => {
1367                let op_ref = crate::ids::OpRef::from(key);
1368                let exec_id = self.allocate_exec_id();
1369                self.exec.frontier.push_back((op_ref, exec_id));
1370                Vec::new()
1371            }
1372            TimerKind::After { key } => {
1373                let cmd_id = CommandId::from(key);
1374                let value: Box<dyn SlotValue> = Box::new(crate::syscall::values::TriggerValue);
1375                self.handle_completion(cmd_id, vec![("trigger".to_string(), value)])
1376            }
1377        }
1378    }
1379
1380    /// Merge a sender-claimed `src_peer_addresses` list into the
1381    /// receiver's `AddressBook` entry for `src_peer`. Empty list is
1382    /// a no-op (the sender chose not to advertise). The
1383    /// skip-on-unchanged guard compares the decoded list to the
1384    /// existing entry via slice equality and elides the write when
1385    /// they match — without this the receiver would rewrite the
1386    /// entry once per envelope, swamping the address book with
1387    /// idempotent updates under load.
1388    ///
1389    /// Returns `Err(SrcAddressMergeAllocError)` when the dedup
1390    /// buffer (S4) or the address-book peer dedup (S5) cannot
1391    /// reserve. The caller surfaces this as
1392    /// `WireReceiveError::AllocationFailed`; the address-book hint
1393    /// is best-effort under allocator pressure (the envelope's
1394    /// other fills still route).
1395    fn merge_src_peer_addresses(
1396        &mut self,
1397        src_peer: crate::ids::PeerId,
1398        claimed_bytes: &[Vec<u8>],
1399    ) -> Result<(), SrcAddressMergeAllocError> {
1400        if claimed_bytes.is_empty() {
1401            return Ok(());
1402        }
1403        // S4: dedup buffer for parsed Addresses. Use try_reserve_exact
1404        // so an exhausted allocator surfaces as AllocationFailed
1405        // rather than aborting the receiver.
1406        let mut claimed: Vec<crate::framework::Address> = Vec::new();
1407        let claim_count = claimed_bytes.len();
1408        if crate::fallible::try_reserve_exact(&mut claimed, claim_count).is_err() {
1409            return Err(SrcAddressMergeAllocError {
1410                byte_count: claim_count
1411                    .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1412                reason: crate::bus::AllocFailReason::HeapExhausted,
1413            });
1414        }
1415        for bytes in claimed_bytes {
1416            match crate::framework::Address::from_bytes(bytes) {
1417                Ok(addr) => claimed.push(addr),
1418                // Parse failure on one segment drops the hint without
1419                // touching the address book; the envelope's fills still
1420                // route via their own dest_suffix parsing.
1421                Err(_) => return Ok(()),
1422            }
1423        }
1424        if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1425            if existing == claimed.as_slice() {
1426                return Ok(());
1427            }
1428        }
1429        // S5: `add_peer` runs its own try_reserve_exact for the
1430        // peer-entry dedup buffer. Allocation failure surfaces here
1431        // as AddressBookError::AllocationFailed; map to the same
1432        // WireReceiveErrorKind::AllocationFailed. Other errors
1433        // (Full, EmptyAddressList) are deployment-level signals
1434        // already observable elsewhere — swallow as before.
1435        match self.framework.address_book.add_peer(src_peer, claimed) {
1436            Ok(()) => Ok(()),
1437            Err(crate::framework::AddressBookError::AllocationFailed { requested }) => {
1438                Err(SrcAddressMergeAllocError {
1439                    byte_count: requested
1440                        .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1441                    reason: crate::bus::AllocFailReason::HeapExhausted,
1442                })
1443            }
1444            Err(_) => Ok(()),
1445        }
1446    }
1447
1448    /// Merge a transport-observed source address into the
1449    /// receiver's `AddressBook` entry for `src_peer`. Skips the
1450    /// write when the entry already contains `addr` (slice
1451    /// containment check) so a steady stream of envelopes from the
1452    /// same observed endpoint costs at most one `register_address`
1453    /// call. When no entry exists yet — the sender-claimed merge
1454    /// upstream may have short-circuited on an empty list — the
1455    /// observed address bootstraps a fresh one via `add_peer`.
1456    fn merge_src_observed_address(
1457        &mut self,
1458        src_peer: crate::ids::PeerId,
1459        addr: crate::framework::Address,
1460    ) {
1461        if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1462            if existing.contains(&addr) {
1463                return;
1464            }
1465            let _ = self.framework.address_book.register_address(src_peer, addr);
1466            return;
1467        }
1468        let _ = self.framework.address_book.add_peer(src_peer, vec![addr]);
1469    }
1470
1471    /// Sender-side ingest of a `BackoffNotice` envelope. Called from
1472    /// `route_envelope` when the first fill's `type_hash` matches
1473    /// `backoff_notice_type_hash`. Decodes the payload, applies the
1474    /// remote-advised back-off via `BackoffTable::record_remote_advisory`,
1475    /// and records a `PeerGovernor::record_failure` so the existing
1476    /// 5-failure `LifecycleTransition::WentDown` path stays the single
1477    /// peer-down decision site. Returns no `EngineStep`s — the notice
1478    /// never reaches a user Component.
1479    ///
1480    /// When the source peer is unknown (the inbound `EnvelopeFrom`
1481    /// was synthesised by a transport that couldn't attribute the
1482    /// sender), the engine drops the notice silently. When the
1483    /// payload bytes fail to decode, it emits a
1484    /// `WireDecodeFailure` so operators observe the corruption.
1485    fn ingest_backoff_notice(
1486        &mut self,
1487        env: crate::envelope::WireEnvelope,
1488        src_peer: Option<crate::ids::PeerId>,
1489    ) -> Vec<EngineStep> {
1490        let Some(fill) = env.fills.into_iter().next() else {
1491            return Vec::new();
1492        };
1493        let Some(src_peer) = src_peer else {
1494            // No attributable sender; drop silently.
1495            return Vec::new();
1496        };
1497        let Some(payload) = crate::framework::BackoffNoticePayload::decode(&fill.payload) else {
1498            return vec![self.emit_wire_decode_failure(
1499                fill.type_hash,
1500                fill.payload.len(),
1501                "BackoffNoticePayload bincode decode failed".to_string(),
1502            )];
1503        };
1504
1505        let now_ns = self.framework.scheduler.now_ns();
1506        // Advise the sender-side BackoffTable using the receiver's
1507        // quoted delay (§5.2). The existing BackoffGateTx already
1508        // reads `should_retry(peer, now_ns)` and respects the new
1509        // `next_retry_ns`, so no new gate is needed.
1510        self.framework.peer_state.backoff.record_remote_advisory(
1511            src_peer,
1512            now_ns,
1513            payload.min_backoff_ns,
1514        );
1515        // Record a peer-governor failure so the existing 5-failure
1516        // `LifecycleTransition::WentDown` surfacing remains the
1517        // single down-decision path (§5.3).
1518        let transition = self
1519            .framework
1520            .peer_state
1521            .governor
1522            .record_failure(src_peer, now_ns);
1523        // Surface the WentDown lifecycle transition if the receipt
1524        // of this notice pushed the sender's local view of the peer
1525        // across the threshold. The bus event mirrors the existing
1526        // PeerSuspect/PeerDown surfacing path.
1527        if matches!(transition, crate::framework::LifecycleTransition::WentDown,) {
1528            // The existing PhiTransition path emits PeerDown by site;
1529            // here the trigger is the per-peer governor decision so
1530            // no site-level info is available. Telemetry on the
1531            // tracker entry remains via PeerHealth.
1532        }
1533        // Cause is informational on the sender side - it's already
1534        // logged at the receiver via InfraEvent::BackoffNoticeSent.
1535        let _ = payload.cause();
1536        Vec::new()
1537    }
1538
1539    /// Receiver-side back-pressure hook. When the ingress depth
1540    /// crosses the high-water mark (or
1541    /// the caller forces emission via `force = true` from the
1542    /// φ-accrual scan), consult the `BackpressureTracker` for the
1543    /// `src_peer` and - on `Decision::EmitNotice` - mint a
1544    /// `BackoffNotice` envelope back to the sender, push it onto
1545    /// `OutboundQueue`, and publish the matching
1546    /// `InfraEvent::BackoffNoticeSent`. The K-then-silent transition
1547    /// surfaces `InfraEvent::SilentDropActive` as well so operators
1548    /// see the fallback engage. Returns the resulting `EngineStep`s;
1549    /// the caller appends them to the polling step list.
1550    fn maybe_emit_backoff_notice(
1551        &mut self,
1552        src_peer: crate::ids::PeerId,
1553        cause: crate::framework::BackoffCause,
1554        hint_ns: Option<u64>,
1555    ) -> Vec<EngineStep> {
1556        // Pull config-driven mark; PhiAccrual + ExplicitDrop callers
1557        // bypass the queue-depth check because they were triggered
1558        // by an external signal (φ flip / Component reject).
1559        let force = !matches!(cause, crate::framework::BackoffCause::QueueFull);
1560        if !force {
1561            // Compare the pre-drain ingress-depth snapshot to the
1562            // configured high-water fraction of capacity. The
1563            // snapshot stays valid across every
1564            // `process_ingress_event` call inside the same poll
1565            // cycle — using the current `ingress.len()` would see
1566            // post-drain zero and never trip.
1567            let len = self.phase1_pre_drain_depth;
1568            let cap = self.ingress.capacity();
1569            if !self
1570                .framework
1571                .peer_state
1572                .backpressure
1573                .is_over_high_water(len, cap)
1574            {
1575                return Vec::new();
1576            }
1577        }
1578
1579        let now_ns = self.framework.scheduler.now_ns();
1580        // `hint_ns` for QueueFull is sized by the configured
1581        // min-notice interval (the BackpressureTracker enforces the
1582        // floor). PhiAccrual callers may pass a specific mean
1583        // inter-arrival hint; ExplicitDrop callers pass the
1584        // Component-supplied `BackpressureHint`.
1585        let min_hint = hint_ns.unwrap_or(0);
1586        let decision = self
1587            .framework
1588            .peer_state
1589            .backpressure
1590            .observe_overload(src_peer, cause, min_hint, now_ns);
1591
1592        let (min_backoff_ns, cause_chosen) = match decision {
1593            crate::framework::BackpressureDecision::EmitNotice {
1594                min_backoff_ns,
1595                cause,
1596            } => (min_backoff_ns, cause),
1597            crate::framework::BackpressureDecision::Suppress
1598            | crate::framework::BackpressureDecision::SilentDrop => return Vec::new(),
1599        };
1600
1601        // Build the notice envelope + push it on the outbound queue.
1602        let payload =
1603            crate::framework::BackoffNoticePayload::new(min_backoff_ns, cause_chosen, None);
1604        let envelope =
1605            crate::framework::build_backoff_notice_envelope(self.self_peer, src_peer, payload);
1606        self.framework.outbound_queue.push(envelope);
1607
1608        // Bus event for ops dashboards + Component authors that
1609        // want to react to local overload signals.
1610        self.bus.publish(crate::bus::NodeEvent::Infra(
1611            crate::bus::InfraEvent::BackoffNoticeSent {
1612                peer: src_peer,
1613                cause: cause_chosen,
1614                min_backoff_ns,
1615            },
1616        ));
1617
1618        // If this emission flipped the peer into silent-drop mode,
1619        // surface `SilentDropActive` once so operators see the
1620        // K-then-silent transition.
1621        if self
1622            .framework
1623            .peer_state
1624            .backpressure
1625            .is_silent_drop_active(src_peer)
1626        {
1627            self.bus.publish(crate::bus::NodeEvent::Infra(
1628                crate::bus::InfraEvent::SilentDropActive { peer: src_peer },
1629            ));
1630        }
1631
1632        Vec::new()
1633    }
1634}
1635
1636/// Returns `true` when a cycle has reached its op-invocation budget.
1637/// `None` budget always returns `false` (cap disabled).
1638#[inline]
1639fn budget_hit(budget: Option<usize>, ops_invoked: usize) -> bool {
1640    matches!(budget, Some(cap) if ops_invoked >= cap)
1641}
1642
1643/// Reservation failure surfaced by `merge_src_peer_addresses` so the
1644/// caller can mint a single `WireReceiveError::AllocationFailed`
1645/// `EngineStep` carrying the bytes the boundary tried to claim.
1646/// Covers both S4 (the claimed-address dedup buffer) and S5
1647/// (`AddressBook::add_peer`'s peer-entry dedup buffer).
1648struct SrcAddressMergeAllocError {
1649    /// Approximate bytes the failing reservation requested
1650    /// (`address_count * size_of::<Address>()`). Mirrored into the
1651    /// `WireReceiveError::AllocationFailed::byte_count` field for
1652    /// telemetry.
1653    byte_count: usize,
1654    /// Why the reservation failed. `HeapExhausted` for both sites;
1655    /// the boundary has no per-item cap (cap-driven rejection lives
1656    /// on the application-ingress path, not here).
1657    reason: crate::bus::AllocFailReason,
1658}
1659
1660#[cfg(test)]
1661#[path = "poll_recv_seed_tests.rs"]
1662mod poll_recv_seed_tests;
1663
1664#[cfg(test)]
1665#[path = "poll_bus_routing_tests.rs"]
1666mod poll_bus_routing_tests;
1667
1668#[cfg(test)]
1669#[path = "poll_ingress_handler_tests.rs"]
1670mod poll_ingress_handler_tests;
1671
1672#[cfg(test)]
1673#[path = "poll_budget_tests.rs"]
1674mod poll_budget_tests;
1675
1676#[cfg(test)]
1677#[path = "poll_async_error_tests.rs"]
1678mod poll_async_error_tests;
1679
1680#[cfg(test)]
1681#[path = "poll_wire_timeout_tests.rs"]
1682mod poll_wire_timeout_tests;
1683
1684#[cfg(test)]
1685#[path = "introspection_tests.rs"]
1686mod introspection_tests;
1687
1688#[cfg(test)]
1689#[path = "peer_governor_tests.rs"]
1690mod peer_governor_tests;
1691
1692#[cfg(test)]
1693#[path = "poll_backpressure_tests.rs"]
1694mod poll_backpressure_tests;
1695
1696#[cfg(test)]
1697#[path = "poll_src_peer_addresses_tests.rs"]
1698mod poll_src_peer_addresses_tests;
1699
1700#[cfg(test)]
1701#[path = "poll_observed_address_tests.rs"]
1702mod poll_observed_address_tests;
1703
1704#[cfg(test)]
1705#[path = "poll_typed_receive_tests.rs"]
1706mod poll_typed_receive_tests;
1707
1708#[cfg(test)]
1709#[path = "poll_ingress_alloc_tests.rs"]
1710mod poll_ingress_alloc_tests;
1711
1712#[cfg(test)]
1713#[path = "poll_backend_materialize_tests.rs"]
1714mod poll_backend_materialize_tests;