bb_runtime/engine/poll.rs
1//! 8-phase poll cycle + `handle_completion` per `docs/ENGINE.md`
2//! §7 + §9.
3use crate::engine::core::Engine;
4use crate::engine::step::EngineStep;
5use crate::framework::scheduler::TimerKind;
6use crate::ids::{CommandId, NodeSiteId};
7use crate::ingress::IngressEvent;
8use crate::slot_value::SlotValue;
9use crate::syscall::values::{BytesValue, WireReqIdValue};
10
11impl Engine {
12 /// Handle a CommandId completion per ENGINE.md §9.2.
13 /// Writes the values into the suspended Op's output sites +
14 /// pushes ready downstream consumers onto the frontier.
15 pub fn handle_completion(
16 &mut self,
17 cmd_id: CommandId,
18 values: Vec<(String, Box<dyn SlotValue>)>,
19 ) -> Vec<EngineStep> {
20 let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
21 // No matching suspension - completion arrived for a
22 // CommandId the engine doesn't know. Silently drop;
23 return Vec::new();
24 };
25
26 let mut steps = Vec::new();
27
28 // Move output_sites out of pending (pending is owned via
29 // pending_async.remove above). Helpers take &[NodeSiteId]
30 // borrows; the final step consumes by value.
31 let sites: Vec<NodeSiteId> = pending.output_sites;
32 for (i, (_name, value)) in values.into_iter().enumerate() {
33 if let Some(site) = sites.get(i).copied() {
34 self.exec
35 .slot_table
36 .insert((site, pending.exec_id), Some(value));
37 }
38 }
39
40 // Push ready downstream consumers.
41 self.push_ready_consumers(&sites, pending.exec_id);
42
43 // Function-call splice: async completion arriving inside a
44 // body's derived ExecId forwards to the caller's slots per
45 // ENGINE.md §8.4. No-op when there's no pending call context.
46 self.forward_outputs_to_caller(&sites, pending.exec_id);
47
48 // Surface top-level function outputs (no in-function consumer)
49 // as AppEvents - same path as `Engine::write_outputs`, so
50 // async-completion writes participate in the canonical
51 // function-signature → engine I/O contract.
52 self.surface_top_level_outputs(&sites, pending.exec_id);
53
54 steps.push(EngineStep::OpCompleted {
55 op_ref: pending.op_ref,
56 exec_id: pending.exec_id,
57 sites_written: sites,
58 });
59 steps
60 }
61
62 /// Handle a transport-reported failure for a suspended
63 /// `CommandId`. The Op that was waiting on `cmd_id` fails
64 /// through the existing `OpFailed` path (bus
65 /// `InfraEvent::OpFailure` + `EngineStep::OpFailed`). Use this
66 /// when the host's transport adapter learns that the remote
67 /// side failed to produce a result - the framework no longer
68 /// silently swallows the outcome.
69 pub fn handle_completion_failed(
70 &mut self,
71 cmd_id: CommandId,
72 error: crate::bus::OpError,
73 ) -> Vec<EngineStep> {
74 let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
75 // No matching suspension - failure arrived for a
76 // CommandId the engine doesn't know. Silently drop;
77 // the host's transport reconciliation should have
78 // caught this earlier.
79 return Vec::new();
80 };
81 vec![self.fail_op(
82 pending.op_ref,
83 pending.exec_id,
84 crate::bus::OpErrorKind::RemoteFailed,
85 "completion_failed",
86 error.detail,
87 )]
88 }
89
90 /// Expire any pending async suspensions whose `deadline_ns` is
91 /// past `scheduler.now_ns()`. Each expired suspension fails via
92 /// the existing `OpFailed` surface with
93 /// `OpError("deadline exceeded")`. Returns the resulting steps.
94 /// Called from Phase 5 of the poll cycle before draining
95 /// `pending_completions`, so deadline failures land in the
96 /// same poll where they expire.
97 fn expire_deadlines(&mut self) -> Vec<EngineStep> {
98 let now_ns = self.framework.scheduler.now_ns();
99 let expired: Vec<CommandId> = self
100 .exec
101 .pending_async
102 .iter()
103 .filter_map(|(cmd, p)| match p.deadline_ns {
104 Some(d) if d <= now_ns => Some(*cmd),
105 _ => None,
106 })
107 .collect();
108 let mut steps = Vec::new();
109 for cmd in expired {
110 if let Some(p) = self.exec.pending_async.remove(&cmd) {
111 steps.push(self.fail_op(
112 p.op_ref,
113 p.exec_id,
114 crate::bus::OpErrorKind::Timeout,
115 "deadline_exceeded",
116 "deadline exceeded".to_string(),
117 ));
118 }
119 }
120
121 // Drain stale in-flight wire requests. Each evicted entry
122 // surfaces as `EngineStep::WireTimeout` for observability;
123 // if it carried a `parked_op`, fail the originator's local
124 // continuation with "chain timeout" so it doesn't sit
125 // parked forever.
126 let drained = self.framework.request_tracker.drain_stale(now_ns);
127 for (wire_req_id, entry) in drained {
128 steps.push(EngineStep::WireTimeout {
129 wire_req_id,
130 target_site: entry.target_site,
131 started_at_ns: entry.started_at_ns,
132 parked_op: entry.parked_op,
133 });
134 if let Some(cmd) = entry.parked_op {
135 if let Some(p) = self.exec.pending_async.remove(&cmd) {
136 steps.push(self.fail_op(
137 p.op_ref,
138 p.exec_id,
139 crate::bus::OpErrorKind::Timeout,
140 "chain_timeout",
141 "chain timeout".to_string(),
142 ));
143 }
144 }
145 }
146 steps
147 }
148
149 /// 8-phase poll cycle per ENGINE.md §7.
150 pub fn poll(&mut self) -> Vec<EngineStep> {
151 let _poll_span = tracing::debug_span!("engine.poll").entered();
152 // GC executions that finished in the previous cycle. The
153 // one-cycle delay lets the host read completion state via
154 // `slot_at` between polls; production consumers read via the
155 // `EngineStep` stream so the delay is invisible to them.
156 self.gc_completed_executions();
157 let mut steps = Vec::new();
158 // Per-poll counter used by `cycle_op_budget` enforcement.
159 // Increments once per `invoke_one` call across Phases 2, 6,
160 // and 7. When the budget is hit, the current drain loop
161 // breaks and `CycleBudgetExceeded` is appended once.
162 let mut ops_invoked: usize = 0;
163 let mut budget_exceeded = false;
164
165 // Host-driven bootstrap kicks land via `Node::run_bootstrap`
166 // before this poll runs (empty-slice install-order kick or
167 // per-target staging). Both paths arm `bootstrap.pending`
168 // and seed the body's OpRefs onto the frontier so the
169 // drain phases below pick them up. Install no longer
170 // auto-seeds — the host owns when bootstrap starts.
171 let bootstrap_was_pending = self.bootstrap.pending;
172 // Per-phase BootstrapComplete steps accumulate here as each
173 // queued key drains. The final `WaitingOnBootstrap` /
174 // terminal `BootstrapComplete` decision below uses
175 // `bootstrap_was_pending` + post-drain `bootstrap_pending` to
176 // detect a partial drain (queue not yet empty, async pending).
177 let mut bootstrap_phases_completed: usize = 0;
178
179 // While bootstrap is pending the ingress drain consumes only
180 // events that can advance the bootstrap call (its async
181 // completions, transport failures). Body-side events
182 // (AppEvent, EnvelopeFrom, Invoke) requeue so the host
183 // observes the same pre-bootstrap delivery order on the
184 // cycle after BootstrapComplete fires. The loop re-drains
185 // when bootstrap completes mid-pass so body events queued
186 // before bootstrap finished still process in this cycle.
187 //
188 // Snapshot the pre-drain depth so per-envelope handlers see
189 // the receiver's overload signal even after the drain runs the
190 // queue to zero.
191 self.phase1_pre_drain_depth = self.ingress.len();
192 {
193 let _phase1 = tracing::debug_span!("engine.phase1_ingress").entered();
194 loop {
195 let was_pending = self.bootstrap.pending;
196 let ingress_events = self.ingress.drain_all();
197 if ingress_events.is_empty() {
198 break;
199 }
200 for event in ingress_events {
201 if self.bootstrap.pending && self.is_body_phase_ingress(&event) {
202 let _ = self.ingress.push(event);
203 continue;
204 }
205 steps.extend(self.process_ingress_event(event));
206 if self.maybe_complete_bootstrap() {
207 bootstrap_phases_completed += 1;
208 self.seed_bootstrap_call();
209 }
210 }
211 if !was_pending || self.bootstrap.pending {
212 break;
213 }
214 }
215 }
216
217 {
218 let _phase2 = tracing::debug_span!("engine.phase2_frontier_drain").entered();
219 loop {
220 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
221 let step = self.invoke_one(op_ref, exec_id);
222 steps.push(step);
223 ops_invoked += 1;
224 if budget_hit(self.cycle_op_budget, ops_invoked) {
225 budget_exceeded = true;
226 break;
227 }
228 }
229 // If a queued bootstrap phase just drained, re-seed
230 // the next one and cascade in-cycle so the host sees
231 // every BootstrapComplete + the body's first ops in a
232 // single poll when budget permits.
233 if self.maybe_complete_bootstrap() {
234 bootstrap_phases_completed += 1;
235 if budget_exceeded {
236 break;
237 }
238 if !self.seed_bootstrap_call() {
239 break;
240 }
241 } else {
242 break;
243 }
244 }
245 }
246
247 // Before draining, surface any FIFO drops accumulated since
248 // the last poll as an `InfraEvent::BusOverflow`. Publishing
249 // before drain keeps the event in this cycle's routing pass.
250 let bus_dropped = self.bus.take_dropped_count();
251 if bus_dropped > 0 {
252 self.bus.publish(crate::bus::NodeEvent::Infra(
253 crate::bus::InfraEvent::BusOverflow { count: bus_dropped },
254 ));
255 }
256 // For every NodeEvent on the bus, derive its `kind` string
257 // (via NodeEvent::kind()) and look up the subscribed
258 // `NodeSiteId`s. For each site, write a `TriggerValue` at a
259 // fresh `ExecId` and push the site's downstream consumers
260 // onto the frontier. This matches the wire delivery
261 // semantics per `docs/ADDRESSING.md`.
262 let events = self.bus.drain();
263 if !events.is_empty() {
264 let mut to_seed: Vec<crate::ids::NodeSiteId> = Vec::new();
265 for event in events {
266 let kind = event.kind();
267 let Some(sites) = self.event_subscriptions.get(kind) else {
268 continue;
269 };
270 to_seed.extend(sites.iter().copied());
271 }
272 for site in to_seed {
273 let exec_id = self.allocate_exec_id();
274 let value: Box<dyn crate::slot_value::SlotValue> =
275 Box::new(crate::syscall::values::TriggerValue);
276 self.exec.slot_table.insert((site, exec_id), Some(value));
277 let consumers: Vec<crate::ids::OpRef> = self
278 .graphs_iter()
279 .filter_map(|g| g.consumers.get(&site).cloned())
280 .flatten()
281 .collect();
282 for op_ref in consumers {
283 self.exec.frontier.push_back((op_ref, exec_id));
284 }
285 }
286 }
287
288 let now_ns = self.framework.scheduler.now_ns();
289 let matured = self.framework.scheduler.poll_matured(now_ns);
290 for kind in matured {
291 self.handle_matured_timer(kind);
292 }
293
294 // Engine-side deadline scan runs first so an expired
295 // suspension fails this cycle even if a (now-stale)
296 // completion is also queued.
297 {
298 let _phase5 = tracing::debug_span!("engine.phase5_completions").entered();
299 steps.extend(self.expire_deadlines());
300 let completions = std::mem::take(&mut self.exec.pending_completions);
301 for c in completions {
302 steps.extend(self.handle_completion(c.cmd_id, c.results));
303 }
304 if self.maybe_complete_bootstrap() {
305 bootstrap_phases_completed += 1;
306 self.seed_bootstrap_call();
307 }
308 }
309
310 // tracker entry and publish bus events on state changes.
311 {
312 let now_ns = self.framework.scheduler.now_ns();
313 let transitions = self.framework.rtt_tracker.scan_phi(now_ns);
314 for transition in transitions {
315 let event = match transition {
316 crate::framework::rtt_tracker::PhiTransition::Suspect { site, phi } => {
317 crate::bus::InfraEvent::PeerSuspect { site, phi }
318 }
319 crate::framework::rtt_tracker::PhiTransition::Down { site, phi } => {
320 crate::bus::InfraEvent::PeerDown { site, phi }
321 }
322 crate::framework::rtt_tracker::PhiTransition::Live { site } => {
323 crate::bus::InfraEvent::PeerLive { site }
324 }
325 };
326 self.bus.publish(crate::bus::NodeEvent::Infra(event));
327 }
328 }
329
330 if !budget_exceeded {
331 loop {
332 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
333 let step = self.invoke_one(op_ref, exec_id);
334 steps.push(step);
335 ops_invoked += 1;
336 if budget_hit(self.cycle_op_budget, ops_invoked) {
337 budget_exceeded = true;
338 break;
339 }
340 }
341 if self.maybe_complete_bootstrap() {
342 bootstrap_phases_completed += 1;
343 if budget_exceeded {
344 break;
345 }
346 if !self.seed_bootstrap_call() {
347 break;
348 }
349 } else {
350 break;
351 }
352 }
353 }
354
355 // For each phase queued by the host via
356 // `Engine::fire_lifecycle(phase)`, push every enrolled
357 // `LifecyclePhase` op onto the frontier with a fresh ExecId
358 // and emit a `LifecycleFired` step. Cascade-drain so newly
359 // pushed ops invoke in this same poll cycle.
360 let fired: Vec<String> = std::mem::take(&mut self.fired_phases);
361 for phase in &fired {
362 let op_refs: Vec<crate::ids::OpRef> =
363 self.lifecycle_table.get(phase).cloned().unwrap_or_default();
364 let pairs: Vec<(crate::ids::OpRef, crate::ids::ExecId)> = op_refs
365 .into_iter()
366 .map(|op_ref| (op_ref, self.allocate_exec_id()))
367 .collect();
368 for (op_ref, exec_id) in pairs {
369 self.exec.frontier.push_back((op_ref, exec_id));
370 }
371 steps.push(EngineStep::LifecycleFired {
372 phase: phase.clone(),
373 });
374 }
375 if !budget_exceeded {
376 loop {
377 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
378 let step = self.invoke_one(op_ref, exec_id);
379 steps.push(step);
380 ops_invoked += 1;
381 if budget_hit(self.cycle_op_budget, ops_invoked) {
382 budget_exceeded = true;
383 break;
384 }
385 }
386 if self.maybe_complete_bootstrap() {
387 bootstrap_phases_completed += 1;
388 if budget_exceeded {
389 break;
390 }
391 if !self.seed_bootstrap_call() {
392 break;
393 }
394 } else {
395 break;
396 }
397 }
398 }
399
400 let _phase8 = tracing::debug_span!("engine.phase8_outbound").entered();
401 for env in self.framework.outbound_queue.drain_all() {
402 steps.push(EngineStep::SendEnvelope(env));
403 }
404 // Surface peer-resolution failures captured by the wire
405 // syscall during this poll. Each entry becomes a dedicated
406 // EngineStep::PeerResolveFailed; the matching bus event was
407 // already published by the syscall, so subscribers got the
408 // routable mirror in real time.
409 for (peer, op_ref) in self.framework.pending_peer_resolve_failures.drain(..) {
410 steps.push(EngineStep::PeerResolveFailed {
411 peer,
412 op_ref,
413 exec_id: crate::ids::ExecId::from(0u64),
414 });
415 }
416 // Emit a single `OutboundDropped` step capturing FIFO drops
417 // that accumulated since the previous poll (e.g., when push
418 // exceeded `max_outbound_queue`).
419 let dropped = self.framework.outbound_queue.take_dropped_count();
420 if dropped > 0 {
421 steps.push(EngineStep::OutboundDropped { count: dropped });
422 }
423 // Surface a single `CycleBudgetExceeded` at the end so the
424 // host knows to re-poll. The step is appended after all
425 // observable per-op steps so the budget signal trails the
426 // in-cycle work it bounded.
427 if budget_exceeded {
428 steps.push(EngineStep::CycleBudgetExceeded { ops_invoked });
429 }
430 // Bootstrap state observable. Each queued bootstrap key that
431 // drained during this cycle emits its own `BootstrapComplete`
432 // — multi-target installs surface one signal per target in
433 // install order. `WaitingOnBootstrap` lands when the
434 // *currently* in-flight bootstrap op suspended on async
435 // completion and the host must drive the resumption before
436 // re-polling. Body-phase ops are gated from firing while
437 // `bootstrap_pending` is set.
438 if bootstrap_was_pending {
439 for _ in 0..bootstrap_phases_completed {
440 steps.push(EngineStep::BootstrapComplete);
441 }
442 if self.bootstrap.pending {
443 steps.push(EngineStep::WaitingOnBootstrap);
444 }
445 }
446 // Drain AppEmit / AppNotify syscall outputs into
447 // EngineStep::AppEvent. `Emit` carries serialized payload
448 // bytes; `Notify` is a marker-only event (empty `value_bytes`).
449 for ev in std::mem::take(&mut self.framework.pending_app_events) {
450 let (module_name, topic, value_bytes) = match ev {
451 crate::bus::AppEvent::Emit { name, value_bytes } => {
452 (String::new(), name, value_bytes)
453 }
454 crate::bus::AppEvent::Notify { name } => (String::new(), name, Vec::new()),
455 };
456 steps.push(EngineStep::AppEvent {
457 module_name,
458 topic,
459 value_bytes,
460 });
461 }
462
463 steps
464 }
465
466 /// Whether an ingress event would seed body-phase work. The
467 /// bootstrap gate requeues these while the bootstrap call is
468 /// outstanding so app-event delivery + envelope routing
469 /// observe the post-bootstrap engine state. Bootstrap-resuming
470 /// completions, transport failures, and host-injected timer
471 /// matures bypass the gate so the bootstrap call can progress.
472 fn is_body_phase_ingress(&self, event: &IngressEvent) -> bool {
473 matches!(
474 event,
475 IngressEvent::AppEvent { .. }
476 | IngressEvent::EnvelopeFrom { .. }
477 | IngressEvent::Invoke { .. }
478 )
479 }
480
481 /// Ingress-event router. Dispatches each event variant to its
482 /// handler: envelopes route to `deliver_inbound_internal`,
483 /// completions to `handle_completion`, app events to the bus,
484 /// matured timers to `handle_matured_timer`, invoke events to
485 /// the per-module entry point.
486 fn process_ingress_event(&mut self, event: IngressEvent) -> Vec<EngineStep> {
487 match event {
488 IngressEvent::Completion { cmd_id, results } => {
489 // The host's transport pre-decodes opaque payloads
490 // and hands them as `Vec<Vec<u8>>`. The engine wraps
491 // each entry as a `BytesValue` and forwards to
492 // `handle_completion`, which writes the slots and
493 // pushes downstream consumers.
494 let typed_results: Vec<(String, Box<dyn crate::slot_value::SlotValue>)> = results
495 .into_iter()
496 .enumerate()
497 .map(|(i, bytes)| {
498 let value = crate::syscall::values::BytesValue(bytes);
499 (
500 format!("out_{i}"),
501 Box::new(value) as Box<dyn crate::slot_value::SlotValue>,
502 )
503 })
504 .collect();
505 self.handle_completion(cmd_id, typed_results)
506 }
507 IngressEvent::EnvelopeFrom {
508 src_peer,
509 envelope,
510 src_observed_address,
511 } => {
512 // Backpressure pre-flight: silent-drop senders bypass
513 // dispatch; others route normally and then check the
514 // post-pop ingress depth against the high-water mark
515 // to emit a single BackoffNotice.
516 if self
517 .framework
518 .peer_state
519 .backpressure
520 .is_silent_drop_active(src_peer)
521 {
522 return Vec::new();
523 }
524 // Order: claimed (envelope) first so the entry
525 // exists for the observed-address registration step.
526 // Observed wins for NAT-translated cases because it
527 // appends a fresh address the claimed snapshot
528 // cannot know.
529 //
530 // The address-book hint is best-effort under allocator
531 // pressure: if the dedup buffer cannot be reserved we
532 // drop the hint, emit `WireReceiveError::AllocationFailed`,
533 // and continue routing — fills do not depend on the
534 // address book.
535 let mut steps = Vec::new();
536 if let Err(alloc) =
537 self.merge_src_peer_addresses(src_peer, &envelope.src_peer_addresses)
538 {
539 steps.push(self.emit_wire_receive_error(
540 Some(src_peer),
541 0,
542 0,
543 alloc.byte_count,
544 crate::bus::WireReceiveErrorKind::AllocationFailed {
545 byte_count: alloc.byte_count,
546 reason: alloc.reason,
547 },
548 ));
549 }
550 if let Some(observed) = src_observed_address {
551 self.merge_src_observed_address(src_peer, observed);
552 }
553 steps.extend(self.route_envelope(envelope, Some(src_peer)));
554 let backoff_steps = self.maybe_emit_backoff_notice(
555 src_peer,
556 crate::framework::BackoffCause::QueueFull,
557 None,
558 );
559 steps.extend(backoff_steps);
560 steps
561 }
562 IngressEvent::AppEvent {
563 module_name,
564 input_name,
565 value_bytes,
566 } => self.deliver_app_event(module_name, input_name, value_bytes),
567 IngressEvent::Invoke {
568 module_name,
569 inputs,
570 exec_id,
571 } => self.deliver_invoke(module_name, inputs, exec_id),
572 IngressEvent::TimerMatured { at_ns } => {
573 self.framework.scheduler.set_now(at_ns);
574 let matured = self.framework.scheduler.poll_matured(at_ns);
575 let mut out = Vec::new();
576 for kind in matured {
577 out.extend(self.handle_matured_timer(kind));
578 }
579 out
580 }
581 IngressEvent::CompletionFailed { cmd_id, detail } => {
582 // Async-completion FAILURE: route to the typed
583 // OpFailed path via handle_completion_failed so the
584 // parked op fails as itself rather than as a
585 // success-bytes payload.
586 self.handle_completion_failed(
587 cmd_id,
588 crate::bus::OpError {
589 detail,
590 ..Default::default()
591 },
592 )
593 }
594 IngressEvent::SendFailed {
595 wire_req_id,
596 peer: _peer,
597 reason: _reason,
598 } => {
599 // Transport-side delivery failure. Consumes the
600 // in-flight registration so the request tracker doesn't
601 // leak the entry; the parked originator op's failure is
602 // surfaced by the wire-timeout drain (`drain_stale`)
603 // rather than waiting for the TTL to elapse.
604 let now_ns = self.framework.scheduler.now_ns();
605 let _ = self
606 .framework
607 .request_tracker
608 .observe_response(wire_req_id, now_ns);
609 Vec::new()
610 }
611 IngressEvent::AppIngressError {
612 source,
613 byte_count,
614 kind,
615 } => {
616 // Cross-thread bridge for `CompletionSink::complete`
617 // exceeding the per-completion result cap. Re-publish
618 // on the bus so subscribers see the rejection
619 // alongside the synchronous emissions from
620 // `Node::deliver_event` / `Node::invoke`.
621 self.bus.publish(crate::bus::NodeEvent::Infra(
622 crate::bus::InfraEvent::AppIngressError {
623 source,
624 byte_count,
625 kind,
626 },
627 ));
628 Vec::new()
629 }
630 }
631 }
632
633 /// Deliver an inbound `AppEvent`: look up the addressed graph,
634 /// resolve the `input_name` to a `NodeSiteId`, wrap the bytes as
635 /// a `BytesValue`, seed the slot at a fresh `ExecId`, and push
636 /// ready downstream consumers onto the frontier. Surfaces an
637 /// observable `EngineStep::AppEvent` so the host can confirm
638 /// delivery even if no consumer exists yet.
639 fn deliver_app_event(
640 &mut self,
641 module_name: String,
642 input_name: String,
643 value_bytes: Vec<u8>,
644 ) -> Vec<EngineStep> {
645 let step = EngineStep::AppEvent {
646 module_name: module_name.clone(),
647 topic: input_name.clone(),
648 value_bytes: value_bytes.clone(),
649 };
650 let Some(graph) = self.graph(&module_name) else {
651 return vec![step];
652 };
653 let Some(&site) = graph.site_names.get(&input_name) else {
654 return vec![step];
655 };
656 let exec_id = self.allocate_exec_id();
657 let value = crate::syscall::values::BytesValue(value_bytes);
658 self.exec
659 .slot_table
660 .insert((site, exec_id), Some(Box::new(value)));
661 self.push_ready_consumers(&[site], exec_id);
662 vec![step]
663 }
664
665 /// Deliver an inbound `Invoke`: seed every `(input_name,
666 /// value_bytes)` pair into the addressed graph's matching site
667 /// at the supplied `exec_id`, then push the ready consumers
668 /// onto the frontier. Unknown modules / unknown input names are
669 /// silent no-ops (the host can detect via subsequent polls
670 /// producing no steps).
671 fn deliver_invoke(
672 &mut self,
673 module_name: String,
674 inputs: Vec<(String, Vec<u8>)>,
675 exec_id: crate::ids::ExecId,
676 ) -> Vec<EngineStep> {
677 let Some(graph) = self.graph(&module_name) else {
678 return Vec::new();
679 };
680 let mut seeded_sites: Vec<crate::ids::NodeSiteId> = Vec::new();
681 let pairs: Vec<(crate::ids::NodeSiteId, Vec<u8>)> = inputs
682 .into_iter()
683 .filter_map(|(name, bytes)| graph.site_names.get(&name).map(|&site| (site, bytes)))
684 .collect();
685 for (site, bytes) in pairs {
686 let value = crate::syscall::values::BytesValue(bytes);
687 self.exec
688 .slot_table
689 .insert((site, exec_id), Some(Box::new(value)));
690 seeded_sites.push(site);
691 }
692 if !seeded_sites.is_empty() {
693 self.push_ready_consumers(&seeded_sites, exec_id);
694 }
695 Vec::new()
696 }
697
698 /// Route a single inbound envelope. Iterates each `SlotFill` and
699 /// dispatches it via its multiaddr `dest_suffix` per
700 /// `docs/ADDRESSING.md`. The receiver doesn't consult any
701 /// subscription table or routing map - the address suffix is the
702 /// routing key. Two suffix shapes are supported:
703 /// - `/site/<NodeSiteId>` → data-plane slot fill.
704 /// - `/component/<cref>/op/<name>` → control-plane component
705 /// dispatch.
706 fn route_envelope(
707 &mut self,
708 env: crate::envelope::WireEnvelope,
709 src_peer: Option<crate::ids::PeerId>,
710 ) -> Vec<EngineStep> {
711 // Sender-side back-pressure ingest. Inbound envelopes whose
712 // first fill carries the reserved
713 // `BackoffNoticePayload` type-hash are framework-internal -
714 // intercept them here, decode the payload, advise the
715 // sender-side `BackoffTable`, and short-circuit the normal
716 // data-plane / control-plane delivery so user Components
717 // never observe a notice envelope.
718 if env
719 .fills
720 .first()
721 .is_some_and(|f| f.type_hash == crate::framework::backoff_notice_type_hash())
722 {
723 return self.ingest_backoff_notice(env, src_peer);
724 }
725
726 let correlation = env.correlation.as_ref().map(|c| c.wire_req_id).unwrap_or(0);
727
728 // hook. If the inbound envelope's wire_req_id matches an
729 // in-flight outbound round trip we registered at dispatch,
730 // pop the sample + feed it into the RTT tracker so the
731 // hierarchical-fallback EMAs converge on real per-edge,
732 // per-site, per-chain, and global RTT distributions.
733 let mut response_from_site: Option<crate::ids::NodeSiteId> = None;
734 if correlation != 0 {
735 let now_ns = self.framework.scheduler.now_ns();
736 if let Some(sample) = self
737 .framework
738 .request_tracker
739 .observe_response(correlation, now_ns)
740 {
741 self.framework.rtt_tracker.observe_round_trip(
742 sample.target_site,
743 sample.chain,
744 sample.elapsed_ns,
745 now_ns,
746 );
747 response_from_site = Some(sample.target_site);
748 }
749 }
750
751 // piggyback. The sender attached EdgeRttReport entries
752 // describing its observed outgoing edges in the chain. We
753 // record each report against the entry for the sending
754 // site so multi-hop chain budgets can compose from this
755 // direct neighbor's table.
756 if let Some(from_site) = response_from_site {
757 for report in &env.edge_rtt_reports {
758 self.framework.rtt_tracker.ingest_reported_outgoing(
759 from_site,
760 crate::ids::NodeSiteId::from(report.next_hop_site_id),
761 report.chain_id,
762 report.srtt_ns,
763 report.rttvar_ns,
764 report.sample_count,
765 );
766 }
767 }
768
769 // Capture the inbound envelope's deadline budget + arrival
770 // timestamp so consumer ops (especially `wire.Send` while
771 // forwarding) can propagate them per Dapper.
772 let inbound_remaining_deadline_ns = if env.remaining_deadline_ns > 0 {
773 Some(env.remaining_deadline_ns)
774 } else {
775 None
776 };
777 let arrival_ns = self.framework.scheduler.now_ns();
778
779 let mut steps = Vec::new();
780 for (fill_index, fill) in env.fills.into_iter().enumerate() {
781 steps.extend(self.deliver_fill(
782 fill,
783 fill_index as u32,
784 correlation,
785 src_peer,
786 arrival_ns,
787 inbound_remaining_deadline_ns,
788 ));
789 }
790 steps
791 }
792
793 /// Dispatch one `SlotFill` per `docs/ADDRESSING.md`. Parses
794 /// `fill.dest_suffix` as a multiaddr and routes by the trailing
795 /// segment shape. `fill_index` is the fill's 0-based position
796 /// within the inbound envelope; surfaces on per-fill failure
797 /// events so subscribers can identify the failing fill when
798 /// the envelope partial-delivers.
799 fn deliver_fill(
800 &mut self,
801 fill: crate::envelope::SlotFill,
802 fill_index: u32,
803 wire_req_id: u64,
804 src_peer: Option<crate::ids::PeerId>,
805 arrival_ns: u64,
806 inbound_remaining_deadline_ns: Option<u64>,
807 ) -> Vec<EngineStep> {
808 let addr = match crate::framework::Address::from_bytes(&fill.dest_suffix) {
809 Ok(a) => a,
810 Err(e) => {
811 return vec![self.emit_wire_decode_failure(
812 0,
813 fill.payload.len(),
814 format!("dest_suffix parse: {e}"),
815 )];
816 }
817 };
818
819 // Data-plane suffix: /site/<NodeSiteId>. The Site segment
820 // uniquely identifies the slot (NodeSiteIds are globally
821 // unique within a Node).
822 if let Some(site_id) = addr.site_id() {
823 return self.deliver_data_plane_fill(
824 site_id,
825 fill,
826 fill_index,
827 src_peer,
828 wire_req_id,
829 arrival_ns,
830 inbound_remaining_deadline_ns,
831 );
832 }
833
834 // Control-plane suffix: /component/<cref>/op/<name>.
835 if let (Some(cref), Some(op_name)) = (addr.component_ref(), addr.op_name()) {
836 let op_name = op_name.to_string();
837 return self.deliver_control_plane_fill(cref, op_name, fill, wire_req_id);
838 }
839
840 vec![self.emit_wire_decode_failure(
841 0,
842 fill.payload.len(),
843 "address shape neither data-plane nor control-plane".to_string(),
844 )]
845 }
846
847 /// Publish a `WireDecodeFailure` onto the bus and return the
848 /// matching `EngineStep`. The bus event lets in-process
849 /// telemetry subscribers react; the EngineStep surfaces the
850 /// same context to the host poll() caller.
851 fn emit_wire_decode_failure(
852 &mut self,
853 hash: u64,
854 payload_size: usize,
855 detail: String,
856 ) -> EngineStep {
857 self.bus.publish(crate::bus::NodeEvent::Infra(
858 crate::bus::InfraEvent::WireDecodeFailure {
859 hash,
860 payload_size,
861 detail: detail.clone(),
862 },
863 ));
864 EngineStep::WireDecodeFailed {
865 hash,
866 payload_size,
867 detail,
868 }
869 }
870
871 /// Data-plane delivery: decode the fill payload into a typed
872 /// `SlotValue` via the wire decoder registry, write it into the
873 /// addressed slot at a fresh `ExecId`, and push the slot's
874 /// downstream consumers onto the frontier. Walks each installed
875 /// graph's `consumers` map for the matching `NodeSiteId`.
876 ///
877 /// Per-fill failures (unknown type-hash, type mismatch against
878 /// the slot's declared wire type, decoder error) surface as a
879 /// `WireReceiveError` InfraEvent + matching `WireReceiveFailed`
880 /// EngineStep. The failing fill drops; sibling fills in the
881 /// same envelope still deliver (the caller continues iterating).
882 #[allow(clippy::too_many_arguments)]
883 fn deliver_data_plane_fill(
884 &mut self,
885 site_id: crate::ids::NodeSiteId,
886 mut fill: crate::envelope::SlotFill,
887 fill_index: u32,
888 src_peer: Option<crate::ids::PeerId>,
889 wire_req_id: u64,
890 arrival_ns: u64,
891 inbound_remaining_deadline_ns: Option<u64>,
892 ) -> Vec<EngineStep> {
893 // Resolve the consumer ops from each installed graph; a
894 // NodeSiteId belongs to at most one graph because IDs are
895 // globally unique, but we tolerate empty lookups.
896 let consumers: Vec<crate::ids::OpRef> = self
897 .graphs_iter()
898 .filter_map(|g| g.consumers.get(&site_id).cloned())
899 .flatten()
900 .collect();
901
902 // Resolve the typed `SlotValue` BEFORE allocating an
903 // ExecId or stamping inbound context: failure modes return
904 // a WireReceiveError step and the envelope's other fills
905 // continue to deliver without polluting the slot table.
906 // Trigger fills bypass the decoder lookup entirely.
907 let value: Box<dyn crate::slot_value::SlotValue> = if fill.trigger_only {
908 Box::new(crate::syscall::values::TriggerValue)
909 } else {
910 match self.decode_typed_fill(&mut fill, fill_index, site_id, src_peer) {
911 Ok(v) => v,
912 Err(step) => return vec![step],
913 }
914 };
915
916 let exec_id = self.allocate_exec_id();
917 // Stamp the inbound envelope context for this ExecId.
918 // Components access this through `RuntimeResourceRef` (RX
919 // gates filter on `src_peer`; `wire.Send` forwarding inside a
920 // chain reuses `wire_req_id` + propagates `remaining_deadline_ns`
921 // minus elapsed local time).
922 self.framework.inbound_contexts.insert(
923 exec_id,
924 crate::framework::InboundContext {
925 src_peer,
926 wire_req_id: if wire_req_id != 0 {
927 Some(wire_req_id)
928 } else {
929 None
930 },
931 arrival_ns: Some(arrival_ns),
932 remaining_deadline_ns: inbound_remaining_deadline_ns,
933 },
934 );
935 // `slot_write` routes through the engine's budget-release
936 // bookkeeping so a wire-receive overwriting a prior carrier
937 // releases the prior `charged_bytes()` against
938 // `ingress_bytes_in_flight`. Fresh-slot writes (the common
939 // case here, since `exec_id` is freshly allocated) hit the
940 // no-prior branch and incur the same cost as the raw
941 // `slot_table.insert`.
942 self.slot_write(site_id, exec_id, value);
943
944 // If site_id is a wire.Recv's payload site, also populate the
945 // paired sender site with PeerIdValue(src_peer) for the same
946 // ExecId. Downstream user ops read this as a graph value to
947 // identify provenance; reply-to is `g.net_out(name, sender, reply)`.
948 let sender_site: Option<crate::ids::NodeSiteId> = self
949 .graphs_iter()
950 .find_map(|g| g.recv_sender_sites.get(&site_id).copied());
951 if let (Some(sender_site), Some(peer)) = (sender_site, src_peer) {
952 let sender_value: Box<dyn crate::slot_value::SlotValue> =
953 Box::new(crate::syscall::values::PeerIdValue(peer));
954 self.exec
955 .slot_table
956 .insert((sender_site, exec_id), Some(sender_value));
957 }
958
959 for op_ref in consumers {
960 self.exec.frontier.push_back((op_ref, exec_id));
961 }
962 Vec::new()
963 }
964
965 /// Resolve a non-trigger data-plane fill into its typed
966 /// `SlotValue` carrier. The routing tree branches on the
967 /// destination slot's binding:
968 ///
969 /// - **Backend-bound slot** — the engine takes ownership of
970 /// `fill.payload` via `std::mem::take` (zero-copy ownership
971 /// transfer; `fill.payload` is already framework-owned from
972 /// envelope decode) and hands it to the bound backend's
973 /// `materialize_from_wire`. The typed tensor lands inside a
974 /// `BackendTensorCarrier` whose `charged_bytes` + `backend_ref`
975 /// are stamped to the engine-side accounting before the carrier
976 /// is returned for slot-table install.
977 /// - **Framework-carrier slot** — the wire decoder registry
978 /// resolves the `type_hash` to a bincode decoder; the decoded
979 /// `SlotValue` rides on as-is.
980 ///
981 /// Failure modes surface as typed `WireReceiveError` InfraEvents
982 /// + matching `WireReceiveFailed` EngineSteps:
983 ///
984 /// - **TypeMismatch** — destination slot declares an expected
985 /// wire-type hash via `GraphSlot::recv_wire_type_hash` and
986 /// the fill's `type_hash` does not match. Checked first so a
987 /// mis-typed payload never reaches the decoder.
988 /// - **UnknownTypeHash** — framework-carrier path only; no
989 /// decoder is registered for the stamped hash.
990 /// - **DecodeFailed** — registered decoder ran and rejected
991 /// the bytes (framework-carrier path).
992 /// - **BudgetExceeded** — admitting the bytes would push the
993 /// engine over `NodeConfig::ingress_byte_budget`.
994 /// - **BackendMaterializeFailed** — the bound backend's typed
995 /// `materialize_from_wire` returned `Err` (backend path).
996 ///
997 /// The `Err` carries the typed `EngineStep` the caller will
998 /// surface to `Engine::poll`'s return value. `EngineStep` is
999 /// load-bearing for the host's failure visibility surface, so
1000 /// boxing it here would force every caller through an
1001 /// indirection layer that adds no clarity for a single
1002 /// internal call site.
1003 ///
1004 /// **No per-fill scratch buffer.** Backend-mediated tensor fills
1005 /// move `fill.payload` into `materialize_from_wire` via
1006 /// `std::mem::take`; the empty `Vec<u8>` left behind drops with
1007 /// the `SlotFill`. The only memcpy on the tensor path is the
1008 /// one the backend chooses to do — or skips via zero-copy
1009 /// adoption (`ArrayD::from_shape_vec` when alignment permits).
1010 #[allow(clippy::result_large_err)]
1011 fn decode_typed_fill(
1012 &mut self,
1013 fill: &mut crate::envelope::SlotFill,
1014 fill_index: u32,
1015 site_id: crate::ids::NodeSiteId,
1016 src_peer: Option<crate::ids::PeerId>,
1017 ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1018 let expected_hash: Option<u64> = self
1019 .graphs_iter()
1020 .find_map(|g| g.recv_wire_type_hash.get(&site_id).copied());
1021 if let Some(expected) = expected_hash {
1022 if expected != fill.type_hash {
1023 return Err(self.emit_wire_receive_error(
1024 src_peer,
1025 fill_index,
1026 fill.type_hash,
1027 fill.payload.len(),
1028 crate::bus::WireReceiveErrorKind::TypeMismatch {
1029 expected_hash: expected,
1030 },
1031 ));
1032 }
1033 }
1034
1035 // Resolve the destination slot's binding so the backend-bound
1036 // branch can fire before the framework-carrier registry
1037 // lookup. A site without a recv_site → slot_id mapping (no
1038 // role consumes the Recv payload) takes the framework path.
1039 let slot_id = self
1040 .graphs_iter()
1041 .find_map(|g| g.recv_site_to_slot_id.get(&site_id).copied());
1042 let role_ref = slot_id.and_then(|id| self.role_ref_for_slot_id(id));
1043
1044 // Budget guard (uniform across branches): pre-charge the
1045 // fill's payload length against `NodeConfig::ingress_byte_budget`
1046 // before invoking either decoder. Successful admission leaves
1047 // the charge in the counter; failure paths release before
1048 // returning.
1049 let byte_count = fill.payload.len();
1050 if let Err(reason) = self.try_charge(byte_count) {
1051 return Err(self.emit_wire_receive_error(
1052 src_peer,
1053 fill_index,
1054 fill.type_hash,
1055 byte_count,
1056 crate::bus::WireReceiveErrorKind::BudgetExceeded {
1057 byte_count: reason.byte_count,
1058 budget_remaining: reason.budget_remaining,
1059 },
1060 ));
1061 }
1062
1063 if let Some((crate::registry::ComponentRole::Backend, backend_ref)) = role_ref {
1064 return self.materialize_via_backend(
1065 fill,
1066 fill_index,
1067 src_peer,
1068 byte_count,
1069 backend_ref,
1070 );
1071 }
1072
1073 let Some(decoder) = crate::slot_value::wire_decoder_registry()
1074 .get(&fill.type_hash)
1075 .copied()
1076 else {
1077 self.release(byte_count);
1078 return Err(self.emit_wire_receive_error(
1079 src_peer,
1080 fill_index,
1081 fill.type_hash,
1082 fill.payload.len(),
1083 crate::bus::WireReceiveErrorKind::UnknownTypeHash,
1084 ));
1085 };
1086 decoder(&fill.payload).map_err(|e| {
1087 self.release(byte_count);
1088 self.emit_wire_receive_error(
1089 src_peer,
1090 fill_index,
1091 fill.type_hash,
1092 byte_count,
1093 crate::bus::WireReceiveErrorKind::DecodeFailed {
1094 error_summary: e.to_string(),
1095 },
1096 )
1097 })
1098 }
1099
1100 /// Backend-bound branch of [`Self::decode_typed_fill`]. Hands the
1101 /// inbound bytes to the bound backend via the role-dispatch
1102 /// registry; wraps the typed `Self::Tensor` in a
1103 /// `BackendTensorCarrier` whose engine-side accounting fields
1104 /// (`charged_bytes`, `backend_ref`) are stamped before the
1105 /// carrier is returned. On error releases the byte charge and
1106 /// emits `BackendMaterializeFailed`.
1107 #[allow(clippy::result_large_err)]
1108 fn materialize_via_backend(
1109 &mut self,
1110 fill: &mut crate::envelope::SlotFill,
1111 fill_index: u32,
1112 src_peer: Option<crate::ids::PeerId>,
1113 byte_count: usize,
1114 backend_ref: crate::ids::ComponentRef,
1115 ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1116 // `mem::take` transfers ownership of `fill.payload` to the
1117 // backend at zero cost: the wire bytes are already
1118 // framework-owned (prost allocated them during envelope
1119 // decode), and the empty `Vec<u8>` left in `fill.payload`
1120 // drops with the `SlotFill`. No scratch buffer, no memcpy on
1121 // the framework side.
1122 let bytes = std::mem::take(&mut fill.payload);
1123 let type_hash = fill.type_hash;
1124
1125 // Take the backend component out of the Vec so dispatch can
1126 // borrow it without holding a long lease on `engine.components`.
1127 // Restore on the way out — even on error paths.
1128 let Some(mut taken) = self.take_component(backend_ref) else {
1129 self.release(byte_count);
1130 return Err(self.emit_wire_receive_error(
1131 src_peer,
1132 fill_index,
1133 type_hash,
1134 byte_count,
1135 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1136 backend_ref,
1137 backend_error_summary: "backend component slot empty".to_string(),
1138 },
1139 ));
1140 };
1141
1142 // Reach the backend through the per-T dispatcher in
1143 // `role_dispatchers`. The dispatcher closes over the typed
1144 // `Self::Tensor` so the boxed `SlotValue` returned here is
1145 // already a `BackendTensorCarrier` (the derive bridge does
1146 // the wrap).
1147 let any: &mut dyn std::any::Any = taken.as_mut();
1148 let tid = (*any).type_id();
1149 let dispatcher = self.role_dispatchers.get(&tid).map(|d| d.materialize);
1150
1151 let result = if let Some(materialize) = dispatcher {
1152 (materialize)(any, type_hash, bytes)
1153 } else {
1154 Err(crate::slot_value::BackendMaterializeError {
1155 summary: "no BackendRuntime dispatcher registered".to_string(),
1156 })
1157 };
1158
1159 self.restore_component(backend_ref, taken);
1160
1161 match result {
1162 Ok(boxed) => {
1163 // Downcast to `BackendTensorCarrier` and stamp the
1164 // engine-side accounting fields the derive bridge left
1165 // as placeholders. The bridge constructs the carrier
1166 // with the typed clone / encode fn pointers; the
1167 // engine owns the budget counter and the backend
1168 // identity, so it fills them in here. The downcast
1169 // is infallible by the bridge's construction; any
1170 // hand-rolled `BackendRuntime::materialize_from_wire`
1171 // that returns a non-carrier `SlotValue` flows
1172 // through unchanged (no accounting stamp), which is
1173 // the right behaviour because non-carrier returns
1174 // never charge against the backend-tensor pool.
1175 let any_box = boxed.into_any_boxed();
1176 let final_boxed: Box<dyn crate::slot_value::SlotValue> =
1177 match any_box.downcast::<crate::slot_value::BackendTensorCarrier>() {
1178 Ok(mut carrier) => {
1179 carrier.charged_bytes = byte_count;
1180 carrier.backend_ref = backend_ref;
1181 carrier
1182 }
1183 Err(other) => {
1184 // The dispatcher returned a non-carrier
1185 // `SlotValue`; route it through unchanged.
1186 // `Box<dyn Any + Send + Sync>` downcasts
1187 // back to `Box<dyn SlotValue>` only via a
1188 // typed re-box, which we don't do — log
1189 // and release the budget charge instead.
1190 let _ = other;
1191 self.release(byte_count);
1192 return Err(self.emit_wire_receive_error(
1193 src_peer,
1194 fill_index,
1195 type_hash,
1196 byte_count,
1197 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1198 backend_ref,
1199 backend_error_summary:
1200 "backend bridge returned non-carrier SlotValue".to_string(),
1201 },
1202 ));
1203 }
1204 };
1205 Ok(final_boxed)
1206 }
1207 Err(e) => {
1208 self.release(byte_count);
1209 Err(self.emit_wire_receive_error(
1210 src_peer,
1211 fill_index,
1212 type_hash,
1213 byte_count,
1214 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1215 backend_ref,
1216 backend_error_summary: e.summary,
1217 },
1218 ))
1219 }
1220 }
1221 }
1222
1223 /// Publish a `WireReceiveError` on the bus and return the
1224 /// matching `EngineStep`. Mirrors `emit_wire_decode_failure`
1225 /// for the per-fill typed-decode failure surface.
1226 fn emit_wire_receive_error(
1227 &mut self,
1228 src_peer: Option<crate::ids::PeerId>,
1229 fill_index: u32,
1230 actual_hash: u64,
1231 payload_size: usize,
1232 kind: crate::bus::WireReceiveErrorKind,
1233 ) -> EngineStep {
1234 self.bus.publish(crate::bus::NodeEvent::Infra(
1235 crate::bus::InfraEvent::WireReceiveError {
1236 src_peer,
1237 fill_index,
1238 actual_hash,
1239 payload_size,
1240 kind: kind.clone(),
1241 },
1242 ));
1243 EngineStep::WireReceiveFailed {
1244 src_peer,
1245 fill_index,
1246 actual_hash,
1247 payload_size,
1248 kind,
1249 }
1250 }
1251
1252 /// Control-plane delivery: invoke
1253 /// `component[cref].dispatch_atomic(op_name, [(payload,
1254 /// correlation, ...)], ctx)`. The component decodes the payload
1255 /// bytes via its own protocol logic.
1256 fn deliver_control_plane_fill(
1257 &mut self,
1258 component_ref: crate::ids::ComponentRef,
1259 op_name: String,
1260 fill: crate::envelope::SlotFill,
1261 wire_req_id: u64,
1262 ) -> Vec<EngineStep> {
1263 let payload = BytesValue(fill.payload);
1264 let correlation = WireReqIdValue(wire_req_id);
1265
1266 let inputs_storage: Vec<(String, Box<dyn SlotValue>)> = vec![
1267 ("payload".to_string(), Box::new(payload)),
1268 ("correlation".to_string(), Box::new(correlation)),
1269 ];
1270
1271 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> = inputs_storage
1272 .iter()
1273 .map(|(n, h)| (n.as_str(), h.as_ref()))
1274 .collect();
1275
1276 // D2 take-and-restore so we can split-borrow the rest of
1277 // `self.framework` / `self.bus` etc. while the dispatching
1278 // component is held exclusively.
1279 let Some(mut taken) = self.take_component(component_ref) else {
1280 return Vec::new();
1281 };
1282
1283 let mut ctx = crate::runtime::RuntimeResourceRef {
1284 peers: crate::runtime::PeerCtx {
1285 gate: &mut self.framework.peer_state.gate,
1286 backoff: &mut self.framework.peer_state.backoff,
1287 governor: &mut self.framework.peer_state.governor,
1288 addresses: &mut self.framework.address_book,
1289 backpressure: &mut self.framework.peer_state.backpressure,
1290 },
1291 net: crate::runtime::NetCtx {
1292 outbound: &mut self.framework.outbound_queue,
1293 rtt: &mut self.framework.rtt_tracker,
1294 requests: &mut self.framework.request_tracker,
1295 dedup: &mut self.framework.inbound_dedup,
1296 pending_peer_resolve_failures: &mut self.framework.pending_peer_resolve_failures,
1297 },
1298 time: crate::runtime::TimeCtx {
1299 scheduler: &mut self.framework.scheduler,
1300 },
1301 syscall: crate::runtime::SyscallCtx {
1302 serialize_queue: &mut self.framework.serialize_queue,
1303 hold_table: &mut self.framework.hold_table,
1304 record_buffer: &mut self.framework.record_buffer,
1305 event_source: &mut self.framework.event_source,
1306 counters: &mut self.framework.counters,
1307 any_fired_groups: &mut self.framework.any_fired_groups,
1308 deadline_match_fired: &mut self.framework.deadline_match_fired,
1309 rng: &mut *self.framework.rng,
1310 pending_app_events: &mut self.framework.pending_app_events,
1311 },
1312 bus: &mut self.bus,
1313 ingress: std::sync::Arc::clone(&self.ingress),
1314 components: crate::runtime::ComponentsView {
1315 instances: Some(&self.components),
1316 slots: Some(&self.slots),
1317 },
1318 current: crate::runtime::CurrentCallCtx {
1319 op_ref: crate::ids::OpRef::from(0u64),
1320 exec_id: crate::ids::ExecId::from(0u64),
1321 self_peer: self.self_peer,
1322 node_attributes: &[],
1323 node_metadata: &[],
1324 inbound: crate::runtime::InboundCtx {
1325 src_peer: None,
1326 wire_req_id: None,
1327 arrival_ns: None,
1328 remaining_deadline_ns: None,
1329 },
1330 pending_completions: Vec::new(),
1331 next_command_id: &mut self.exec.ids.next_command_id,
1332 },
1333 };
1334
1335 let _ = crate::engine::invoke::call_protocol_dispatch_atomic(
1336 taken.as_mut(),
1337 &op_name,
1338 &inputs_for_dispatch,
1339 &mut ctx,
1340 &self.role_dispatchers,
1341 );
1342 let captured = std::mem::take(&mut ctx.current.pending_completions);
1343 drop(ctx);
1344 self.exec.pending_completions.extend(captured);
1345
1346 self.restore_component(component_ref, taken);
1347
1348 Vec::new()
1349 }
1350
1351 /// Route a matured timer to its consumer.
1352 /// - `Sleep`/`Completion` fulfil a pending `CommandId`.
1353 /// - `Interval` re-pushes its owning Op onto the frontier at a
1354 /// fresh `ExecId`; the Op's `invoke` re-schedules the next
1355 /// firing and emits the periodic `TriggerValue` downstream.
1356 /// - `After` fulfils the parked `CommandId` (which the Op
1357 /// suspended on) with a single `TriggerValue`.
1358 fn handle_matured_timer(
1359 &mut self,
1360 kind: crate::framework::scheduler::TimerKind,
1361 ) -> Vec<EngineStep> {
1362 match kind {
1363 TimerKind::Sleep(cmd_id) | TimerKind::Completion(cmd_id) => {
1364 self.handle_completion(cmd_id, Vec::new())
1365 }
1366 TimerKind::Interval { key, .. } => {
1367 let op_ref = crate::ids::OpRef::from(key);
1368 let exec_id = self.allocate_exec_id();
1369 self.exec.frontier.push_back((op_ref, exec_id));
1370 Vec::new()
1371 }
1372 TimerKind::After { key } => {
1373 let cmd_id = CommandId::from(key);
1374 let value: Box<dyn SlotValue> = Box::new(crate::syscall::values::TriggerValue);
1375 self.handle_completion(cmd_id, vec![("trigger".to_string(), value)])
1376 }
1377 }
1378 }
1379
1380 /// Merge a sender-claimed `src_peer_addresses` list into the
1381 /// receiver's `AddressBook` entry for `src_peer`. Empty list is
1382 /// a no-op (the sender chose not to advertise). The
1383 /// skip-on-unchanged guard compares the decoded list to the
1384 /// existing entry via slice equality and elides the write when
1385 /// they match — without this the receiver would rewrite the
1386 /// entry once per envelope, swamping the address book with
1387 /// idempotent updates under load.
1388 ///
1389 /// Returns `Err(SrcAddressMergeAllocError)` when the dedup
1390 /// buffer (S4) or the address-book peer dedup (S5) cannot
1391 /// reserve. The caller surfaces this as
1392 /// `WireReceiveError::AllocationFailed`; the address-book hint
1393 /// is best-effort under allocator pressure (the envelope's
1394 /// other fills still route).
1395 fn merge_src_peer_addresses(
1396 &mut self,
1397 src_peer: crate::ids::PeerId,
1398 claimed_bytes: &[Vec<u8>],
1399 ) -> Result<(), SrcAddressMergeAllocError> {
1400 if claimed_bytes.is_empty() {
1401 return Ok(());
1402 }
1403 // S4: dedup buffer for parsed Addresses. Use try_reserve_exact
1404 // so an exhausted allocator surfaces as AllocationFailed
1405 // rather than aborting the receiver.
1406 let mut claimed: Vec<crate::framework::Address> = Vec::new();
1407 let claim_count = claimed_bytes.len();
1408 if crate::fallible::try_reserve_exact(&mut claimed, claim_count).is_err() {
1409 return Err(SrcAddressMergeAllocError {
1410 byte_count: claim_count
1411 .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1412 reason: crate::bus::AllocFailReason::HeapExhausted,
1413 });
1414 }
1415 for bytes in claimed_bytes {
1416 match crate::framework::Address::from_bytes(bytes) {
1417 Ok(addr) => claimed.push(addr),
1418 // Parse failure on one segment drops the hint without
1419 // touching the address book; the envelope's fills still
1420 // route via their own dest_suffix parsing.
1421 Err(_) => return Ok(()),
1422 }
1423 }
1424 if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1425 if existing == claimed.as_slice() {
1426 return Ok(());
1427 }
1428 }
1429 // S5: `add_peer` runs its own try_reserve_exact for the
1430 // peer-entry dedup buffer. Allocation failure surfaces here
1431 // as AddressBookError::AllocationFailed; map to the same
1432 // WireReceiveErrorKind::AllocationFailed. Other errors
1433 // (Full, EmptyAddressList) are deployment-level signals
1434 // already observable elsewhere — swallow as before.
1435 match self.framework.address_book.add_peer(src_peer, claimed) {
1436 Ok(()) => Ok(()),
1437 Err(crate::framework::AddressBookError::AllocationFailed { requested }) => {
1438 Err(SrcAddressMergeAllocError {
1439 byte_count: requested
1440 .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1441 reason: crate::bus::AllocFailReason::HeapExhausted,
1442 })
1443 }
1444 Err(_) => Ok(()),
1445 }
1446 }
1447
1448 /// Merge a transport-observed source address into the
1449 /// receiver's `AddressBook` entry for `src_peer`. Skips the
1450 /// write when the entry already contains `addr` (slice
1451 /// containment check) so a steady stream of envelopes from the
1452 /// same observed endpoint costs at most one `register_address`
1453 /// call. When no entry exists yet — the sender-claimed merge
1454 /// upstream may have short-circuited on an empty list — the
1455 /// observed address bootstraps a fresh one via `add_peer`.
1456 fn merge_src_observed_address(
1457 &mut self,
1458 src_peer: crate::ids::PeerId,
1459 addr: crate::framework::Address,
1460 ) {
1461 if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1462 if existing.contains(&addr) {
1463 return;
1464 }
1465 let _ = self.framework.address_book.register_address(src_peer, addr);
1466 return;
1467 }
1468 let _ = self.framework.address_book.add_peer(src_peer, vec![addr]);
1469 }
1470
1471 /// Sender-side ingest of a `BackoffNotice` envelope. Called from
1472 /// `route_envelope` when the first fill's `type_hash` matches
1473 /// `backoff_notice_type_hash`. Decodes the payload, applies the
1474 /// remote-advised back-off via `BackoffTable::record_remote_advisory`,
1475 /// and records a `PeerGovernor::record_failure` so the existing
1476 /// 5-failure `LifecycleTransition::WentDown` path stays the single
1477 /// peer-down decision site. Returns no `EngineStep`s — the notice
1478 /// never reaches a user Component.
1479 ///
1480 /// When the source peer is unknown (the inbound `EnvelopeFrom`
1481 /// was synthesised by a transport that couldn't attribute the
1482 /// sender), the engine drops the notice silently. When the
1483 /// payload bytes fail to decode, it emits a
1484 /// `WireDecodeFailure` so operators observe the corruption.
1485 fn ingest_backoff_notice(
1486 &mut self,
1487 env: crate::envelope::WireEnvelope,
1488 src_peer: Option<crate::ids::PeerId>,
1489 ) -> Vec<EngineStep> {
1490 let Some(fill) = env.fills.into_iter().next() else {
1491 return Vec::new();
1492 };
1493 let Some(src_peer) = src_peer else {
1494 // No attributable sender; drop silently.
1495 return Vec::new();
1496 };
1497 let Some(payload) = crate::framework::BackoffNoticePayload::decode(&fill.payload) else {
1498 return vec![self.emit_wire_decode_failure(
1499 fill.type_hash,
1500 fill.payload.len(),
1501 "BackoffNoticePayload bincode decode failed".to_string(),
1502 )];
1503 };
1504
1505 let now_ns = self.framework.scheduler.now_ns();
1506 // Advise the sender-side BackoffTable using the receiver's
1507 // quoted delay (§5.2). The existing BackoffGateTx already
1508 // reads `should_retry(peer, now_ns)` and respects the new
1509 // `next_retry_ns`, so no new gate is needed.
1510 self.framework.peer_state.backoff.record_remote_advisory(
1511 src_peer,
1512 now_ns,
1513 payload.min_backoff_ns,
1514 );
1515 // Record a peer-governor failure so the existing 5-failure
1516 // `LifecycleTransition::WentDown` surfacing remains the
1517 // single down-decision path (§5.3).
1518 let transition = self
1519 .framework
1520 .peer_state
1521 .governor
1522 .record_failure(src_peer, now_ns);
1523 // Surface the WentDown lifecycle transition if the receipt
1524 // of this notice pushed the sender's local view of the peer
1525 // across the threshold. The bus event mirrors the existing
1526 // PeerSuspect/PeerDown surfacing path.
1527 if matches!(transition, crate::framework::LifecycleTransition::WentDown,) {
1528 // The existing PhiTransition path emits PeerDown by site;
1529 // here the trigger is the per-peer governor decision so
1530 // no site-level info is available. Telemetry on the
1531 // tracker entry remains via PeerHealth.
1532 }
1533 // Cause is informational on the sender side - it's already
1534 // logged at the receiver via InfraEvent::BackoffNoticeSent.
1535 let _ = payload.cause();
1536 Vec::new()
1537 }
1538
1539 /// Receiver-side back-pressure hook. When the ingress depth
1540 /// crosses the high-water mark (or
1541 /// the caller forces emission via `force = true` from the
1542 /// φ-accrual scan), consult the `BackpressureTracker` for the
1543 /// `src_peer` and - on `Decision::EmitNotice` - mint a
1544 /// `BackoffNotice` envelope back to the sender, push it onto
1545 /// `OutboundQueue`, and publish the matching
1546 /// `InfraEvent::BackoffNoticeSent`. The K-then-silent transition
1547 /// surfaces `InfraEvent::SilentDropActive` as well so operators
1548 /// see the fallback engage. Returns the resulting `EngineStep`s;
1549 /// the caller appends them to the polling step list.
1550 fn maybe_emit_backoff_notice(
1551 &mut self,
1552 src_peer: crate::ids::PeerId,
1553 cause: crate::framework::BackoffCause,
1554 hint_ns: Option<u64>,
1555 ) -> Vec<EngineStep> {
1556 // Pull config-driven mark; PhiAccrual + ExplicitDrop callers
1557 // bypass the queue-depth check because they were triggered
1558 // by an external signal (φ flip / Component reject).
1559 let force = !matches!(cause, crate::framework::BackoffCause::QueueFull);
1560 if !force {
1561 // Compare the pre-drain ingress-depth snapshot to the
1562 // configured high-water fraction of capacity. The
1563 // snapshot stays valid across every
1564 // `process_ingress_event` call inside the same poll
1565 // cycle — using the current `ingress.len()` would see
1566 // post-drain zero and never trip.
1567 let len = self.phase1_pre_drain_depth;
1568 let cap = self.ingress.capacity();
1569 if !self
1570 .framework
1571 .peer_state
1572 .backpressure
1573 .is_over_high_water(len, cap)
1574 {
1575 return Vec::new();
1576 }
1577 }
1578
1579 let now_ns = self.framework.scheduler.now_ns();
1580 // `hint_ns` for QueueFull is sized by the configured
1581 // min-notice interval (the BackpressureTracker enforces the
1582 // floor). PhiAccrual callers may pass a specific mean
1583 // inter-arrival hint; ExplicitDrop callers pass the
1584 // Component-supplied `BackpressureHint`.
1585 let min_hint = hint_ns.unwrap_or(0);
1586 let decision = self
1587 .framework
1588 .peer_state
1589 .backpressure
1590 .observe_overload(src_peer, cause, min_hint, now_ns);
1591
1592 let (min_backoff_ns, cause_chosen) = match decision {
1593 crate::framework::BackpressureDecision::EmitNotice {
1594 min_backoff_ns,
1595 cause,
1596 } => (min_backoff_ns, cause),
1597 crate::framework::BackpressureDecision::Suppress
1598 | crate::framework::BackpressureDecision::SilentDrop => return Vec::new(),
1599 };
1600
1601 // Build the notice envelope + push it on the outbound queue.
1602 let payload =
1603 crate::framework::BackoffNoticePayload::new(min_backoff_ns, cause_chosen, None);
1604 let envelope =
1605 crate::framework::build_backoff_notice_envelope(self.self_peer, src_peer, payload);
1606 self.framework.outbound_queue.push(envelope);
1607
1608 // Bus event for ops dashboards + Component authors that
1609 // want to react to local overload signals.
1610 self.bus.publish(crate::bus::NodeEvent::Infra(
1611 crate::bus::InfraEvent::BackoffNoticeSent {
1612 peer: src_peer,
1613 cause: cause_chosen,
1614 min_backoff_ns,
1615 },
1616 ));
1617
1618 // If this emission flipped the peer into silent-drop mode,
1619 // surface `SilentDropActive` once so operators see the
1620 // K-then-silent transition.
1621 if self
1622 .framework
1623 .peer_state
1624 .backpressure
1625 .is_silent_drop_active(src_peer)
1626 {
1627 self.bus.publish(crate::bus::NodeEvent::Infra(
1628 crate::bus::InfraEvent::SilentDropActive { peer: src_peer },
1629 ));
1630 }
1631
1632 Vec::new()
1633 }
1634}
1635
1636/// Returns `true` when a cycle has reached its op-invocation budget.
1637/// `None` budget always returns `false` (cap disabled).
1638#[inline]
1639fn budget_hit(budget: Option<usize>, ops_invoked: usize) -> bool {
1640 matches!(budget, Some(cap) if ops_invoked >= cap)
1641}
1642
1643/// Reservation failure surfaced by `merge_src_peer_addresses` so the
1644/// caller can mint a single `WireReceiveError::AllocationFailed`
1645/// `EngineStep` carrying the bytes the boundary tried to claim.
1646/// Covers both S4 (the claimed-address dedup buffer) and S5
1647/// (`AddressBook::add_peer`'s peer-entry dedup buffer).
1648struct SrcAddressMergeAllocError {
1649 /// Approximate bytes the failing reservation requested
1650 /// (`address_count * size_of::<Address>()`). Mirrored into the
1651 /// `WireReceiveError::AllocationFailed::byte_count` field for
1652 /// telemetry.
1653 byte_count: usize,
1654 /// Why the reservation failed. `HeapExhausted` for both sites;
1655 /// the boundary has no per-item cap (cap-driven rejection lives
1656 /// on the application-ingress path, not here).
1657 reason: crate::bus::AllocFailReason,
1658}
1659
1660#[cfg(test)]
1661#[path = "poll_recv_seed_tests.rs"]
1662mod poll_recv_seed_tests;
1663
1664#[cfg(test)]
1665#[path = "poll_bus_routing_tests.rs"]
1666mod poll_bus_routing_tests;
1667
1668#[cfg(test)]
1669#[path = "poll_ingress_handler_tests.rs"]
1670mod poll_ingress_handler_tests;
1671
1672#[cfg(test)]
1673#[path = "poll_budget_tests.rs"]
1674mod poll_budget_tests;
1675
1676#[cfg(test)]
1677#[path = "poll_async_error_tests.rs"]
1678mod poll_async_error_tests;
1679
1680#[cfg(test)]
1681#[path = "poll_wire_timeout_tests.rs"]
1682mod poll_wire_timeout_tests;
1683
1684#[cfg(test)]
1685#[path = "introspection_tests.rs"]
1686mod introspection_tests;
1687
1688#[cfg(test)]
1689#[path = "peer_governor_tests.rs"]
1690mod peer_governor_tests;
1691
1692#[cfg(test)]
1693#[path = "poll_backpressure_tests.rs"]
1694mod poll_backpressure_tests;
1695
1696#[cfg(test)]
1697#[path = "poll_src_peer_addresses_tests.rs"]
1698mod poll_src_peer_addresses_tests;
1699
1700#[cfg(test)]
1701#[path = "poll_observed_address_tests.rs"]
1702mod poll_observed_address_tests;
1703
1704#[cfg(test)]
1705#[path = "poll_typed_receive_tests.rs"]
1706mod poll_typed_receive_tests;
1707
1708#[cfg(test)]
1709#[path = "poll_ingress_alloc_tests.rs"]
1710mod poll_ingress_alloc_tests;
1711
1712#[cfg(test)]
1713#[path = "poll_backend_materialize_tests.rs"]
1714mod poll_backend_materialize_tests;