bb_runtime/engine/poll.rs
1//! 8-phase poll cycle + `handle_completion` per `docs/ENGINE.md`
2//! §7 + §9.
3use crate::engine::core::Engine;
4use crate::engine::step::EngineStep;
5use crate::framework::scheduler::TimerKind;
6use crate::ids::{CommandId, NodeSiteId};
7use crate::ingress::IngressEvent;
8use crate::slot_value::SlotValue;
9use crate::syscall::values::{BytesValue, WireReqIdValue};
10
11impl Engine {
12 /// Handle a CommandId completion per ENGINE.md §9.2.
13 /// Writes the values into the suspended Op's output sites +
14 /// pushes ready downstream consumers onto the frontier.
15 pub fn handle_completion(
16 &mut self,
17 cmd_id: CommandId,
18 values: Vec<(String, Box<dyn SlotValue>)>,
19 ) -> Vec<EngineStep> {
20 let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
21 // No matching suspension - completion arrived for a
22 // CommandId the engine doesn't know. Silently drop;
23 return Vec::new();
24 };
25
26 let mut steps = Vec::new();
27
28 // Move output_sites out of pending (pending is owned via
29 // pending_async.remove above). Helpers take &[NodeSiteId]
30 // borrows; the final step consumes by value.
31 let sites: Vec<NodeSiteId> = pending.output_sites;
32 for (i, (_name, value)) in values.into_iter().enumerate() {
33 if let Some(site) = sites.get(i).copied() {
34 self.exec
35 .slot_table
36 .insert((site, pending.exec_id), Some(value));
37 }
38 }
39
40 // Push ready downstream consumers.
41 self.push_ready_consumers(&sites, pending.exec_id);
42
43 // Function-call splice: async completion arriving inside a
44 // body's derived ExecId forwards to the caller's slots per
45 // ENGINE.md §8.4. No-op when there's no pending call context.
46 self.forward_outputs_to_caller(&sites, pending.exec_id);
47
48 // Surface top-level function outputs (no in-function consumer)
49 // as AppEvents - same path as `Engine::write_outputs`, so
50 // async-completion writes participate in the canonical
51 // function-signature → engine I/O contract.
52 self.surface_top_level_outputs(&sites, pending.exec_id);
53
54 steps.push(EngineStep::OpCompleted {
55 op_ref: pending.op_ref,
56 exec_id: pending.exec_id,
57 sites_written: sites,
58 });
59 steps
60 }
61
62 /// Handle a transport-reported failure for a suspended
63 /// `CommandId`. The Op that was waiting on `cmd_id` fails
64 /// through the existing `OpFailed` path (bus
65 /// `InfraEvent::OpFailure` + `EngineStep::OpFailed`). Use this
66 /// when the host's transport adapter learns that the remote
67 /// side failed to produce a result - the framework no longer
68 /// silently swallows the outcome.
69 pub fn handle_completion_failed(
70 &mut self,
71 cmd_id: CommandId,
72 error: crate::bus::OpError,
73 ) -> Vec<EngineStep> {
74 let Some(pending) = self.exec.pending_async.remove(&cmd_id) else {
75 // No matching suspension - failure arrived for a
76 // CommandId the engine doesn't know. Silently drop;
77 // the host's transport reconciliation should have
78 // caught this earlier.
79 return Vec::new();
80 };
81 vec![self.fail_op(
82 pending.op_ref,
83 pending.exec_id,
84 crate::bus::OpErrorKind::RemoteFailed,
85 "completion_failed",
86 error.detail,
87 )]
88 }
89
90 /// Expire any pending async suspensions whose `deadline_ns` is
91 /// past `scheduler.now_ns()`. Each expired suspension fails via
92 /// the existing `OpFailed` surface with
93 /// `OpError("deadline exceeded")`. Returns the resulting steps.
94 /// Called from Phase 5 of the poll cycle before draining
95 /// `pending_completions`, so deadline failures land in the
96 /// same poll where they expire.
97 fn expire_deadlines(&mut self) -> Vec<EngineStep> {
98 let now_ns = self.framework.scheduler.now_ns();
99 let expired: Vec<CommandId> = self
100 .exec
101 .pending_async
102 .iter()
103 .filter_map(|(cmd, p)| match p.deadline_ns {
104 Some(d) if d <= now_ns => Some(*cmd),
105 _ => None,
106 })
107 .collect();
108 let mut steps = Vec::new();
109 for cmd in expired {
110 if let Some(p) = self.exec.pending_async.remove(&cmd) {
111 steps.push(self.fail_op(
112 p.op_ref,
113 p.exec_id,
114 crate::bus::OpErrorKind::Timeout,
115 "deadline_exceeded",
116 "deadline exceeded".to_string(),
117 ));
118 }
119 }
120
121 // Drain stale in-flight wire requests. Each evicted entry
122 // surfaces as `EngineStep::WireTimeout` for observability;
123 // if it carried a `parked_op`, fail the originator's local
124 // continuation with "chain timeout" so it doesn't sit
125 // parked forever.
126 let drained = self.framework.request_tracker.drain_stale(now_ns);
127 for (wire_req_id, entry) in drained {
128 steps.push(EngineStep::WireTimeout {
129 wire_req_id,
130 target_site: entry.target_site,
131 started_at_ns: entry.started_at_ns,
132 parked_op: entry.parked_op,
133 });
134 if let Some(cmd) = entry.parked_op {
135 if let Some(p) = self.exec.pending_async.remove(&cmd) {
136 steps.push(self.fail_op(
137 p.op_ref,
138 p.exec_id,
139 crate::bus::OpErrorKind::Timeout,
140 "chain_timeout",
141 "chain timeout".to_string(),
142 ));
143 }
144 }
145 }
146 steps
147 }
148
149 /// 8-phase poll cycle per ENGINE.md §7.
150 pub fn poll(&mut self) -> Vec<EngineStep> {
151 let _poll_span = tracing::debug_span!("engine.poll").entered();
152 // GC executions that finished in the previous cycle. The
153 // one-cycle delay lets the host read completion state via
154 // `slot_at` between polls; production consumers read via the
155 // `EngineStep` stream so the delay is invisible to them.
156 self.gc_completed_executions();
157 let mut steps = Vec::new();
158 // Per-poll counter used by `cycle_op_budget` enforcement.
159 // Increments once per `invoke_one` call across Phases 2, 6,
160 // and 7. When the budget is hit, the current drain loop
161 // breaks and `CycleBudgetExceeded` is appended once.
162 let mut ops_invoked: usize = 0;
163 let mut budget_exceeded = false;
164
165 // Drive any host-supplied bootstrap requests staged between
166 // polls. The host kicks the install-order queue via
167 // `Node::run_bootstrap` (which seeds before this poll runs)
168 // and stages targets-with-inputs via `enqueue_bootstrap_request`;
169 // both paths land here so disjoint targets fire alongside the
170 // already-seeded phase and overlapping ones park on
171 // `bootstrap.waiting` until `maybe_complete_bootstrap`
172 // promotes them. Install no longer auto-seeds — the host
173 // owns when bootstrap starts.
174 self.drive_pending_bootstrap_requests();
175 let bootstrap_was_pending = self.bootstrap.pending;
176 // Per-phase BootstrapComplete steps accumulate here as each
177 // queued key drains. The final `WaitingOnBootstrap` /
178 // terminal `BootstrapComplete` decision below uses
179 // `bootstrap_was_pending` + post-drain `bootstrap_pending` to
180 // detect a partial drain (queue not yet empty, async pending).
181 let mut bootstrap_phases_completed: usize = 0;
182
183 // While bootstrap is pending the ingress drain consumes only
184 // events that can advance the bootstrap call (its async
185 // completions, transport failures). Body-side events
186 // (AppEvent, EnvelopeFrom, Invoke) requeue so the host
187 // observes the same pre-bootstrap delivery order on the
188 // cycle after BootstrapComplete fires. The loop re-drains
189 // when bootstrap completes mid-pass so body events queued
190 // before bootstrap finished still process in this cycle.
191 //
192 // Snapshot the pre-drain depth so per-envelope handlers see
193 // the receiver's overload signal even after the drain runs the
194 // queue to zero.
195 self.phase1_pre_drain_depth = self.ingress.len();
196 {
197 let _phase1 = tracing::debug_span!("engine.phase1_ingress").entered();
198 loop {
199 let was_pending = self.bootstrap.pending;
200 let ingress_events = self.ingress.drain_all();
201 if ingress_events.is_empty() {
202 break;
203 }
204 for event in ingress_events {
205 if self.bootstrap.pending && self.is_body_phase_ingress(&event) {
206 let _ = self.ingress.push(event);
207 continue;
208 }
209 steps.extend(self.process_ingress_event(event));
210 if self.maybe_complete_bootstrap() {
211 bootstrap_phases_completed += 1;
212 self.seed_bootstrap_call();
213 }
214 }
215 if !was_pending || self.bootstrap.pending {
216 break;
217 }
218 }
219 }
220
221 {
222 let _phase2 = tracing::debug_span!("engine.phase2_frontier_drain").entered();
223 loop {
224 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
225 let step = self.invoke_one(op_ref, exec_id);
226 steps.push(step);
227 ops_invoked += 1;
228 if budget_hit(self.cycle_op_budget, ops_invoked) {
229 budget_exceeded = true;
230 break;
231 }
232 }
233 // If a queued bootstrap phase just drained, re-seed
234 // the next one and cascade in-cycle so the host sees
235 // every BootstrapComplete + the body's first ops in a
236 // single poll when budget permits.
237 if self.maybe_complete_bootstrap() {
238 bootstrap_phases_completed += 1;
239 if budget_exceeded {
240 break;
241 }
242 if !self.seed_bootstrap_call() {
243 break;
244 }
245 } else {
246 break;
247 }
248 }
249 }
250
251 // Before draining, surface any FIFO drops accumulated since
252 // the last poll as an `InfraEvent::BusOverflow`. Publishing
253 // before drain keeps the event in this cycle's routing pass.
254 let bus_dropped = self.bus.take_dropped_count();
255 if bus_dropped > 0 {
256 self.bus.publish(crate::bus::NodeEvent::Infra(
257 crate::bus::InfraEvent::BusOverflow { count: bus_dropped },
258 ));
259 }
260 // For every NodeEvent on the bus, derive its `kind` string
261 // (via NodeEvent::kind()) and look up the subscribed
262 // `NodeSiteId`s. For each site, write a `TriggerValue` at a
263 // fresh `ExecId` and push the site's downstream consumers
264 // onto the frontier. This matches the wire delivery
265 // semantics per `docs/ADDRESSING.md`.
266 let events = self.bus.drain();
267 if !events.is_empty() {
268 let mut to_seed: Vec<crate::ids::NodeSiteId> = Vec::new();
269 for event in events {
270 let kind = event.kind();
271 let Some(sites) = self.event_subscriptions.get(kind) else {
272 continue;
273 };
274 to_seed.extend(sites.iter().copied());
275 }
276 for site in to_seed {
277 let exec_id = self.allocate_exec_id();
278 let value: Box<dyn crate::slot_value::SlotValue> =
279 Box::new(crate::syscall::values::TriggerValue);
280 self.exec.slot_table.insert((site, exec_id), Some(value));
281 let consumers: Vec<crate::ids::OpRef> = self
282 .graphs_iter()
283 .filter_map(|g| g.consumers.get(&site).cloned())
284 .flatten()
285 .collect();
286 for op_ref in consumers {
287 self.exec.frontier.push_back((op_ref, exec_id));
288 }
289 }
290 }
291
292 let now_ns = self.framework.scheduler.now_ns();
293 let matured = self.framework.scheduler.poll_matured(now_ns);
294 for kind in matured {
295 self.handle_matured_timer(kind);
296 }
297
298 // Engine-side deadline scan runs first so an expired
299 // suspension fails this cycle even if a (now-stale)
300 // completion is also queued.
301 {
302 let _phase5 = tracing::debug_span!("engine.phase5_completions").entered();
303 steps.extend(self.expire_deadlines());
304 let completions = std::mem::take(&mut self.exec.pending_completions);
305 for c in completions {
306 steps.extend(self.handle_completion(c.cmd_id, c.results));
307 }
308 if self.maybe_complete_bootstrap() {
309 bootstrap_phases_completed += 1;
310 self.seed_bootstrap_call();
311 }
312 }
313
314 // tracker entry and publish bus events on state changes.
315 {
316 let now_ns = self.framework.scheduler.now_ns();
317 let transitions = self.framework.rtt_tracker.scan_phi(now_ns);
318 for transition in transitions {
319 let event = match transition {
320 crate::framework::rtt_tracker::PhiTransition::Suspect { site, phi } => {
321 crate::bus::InfraEvent::PeerSuspect { site, phi }
322 }
323 crate::framework::rtt_tracker::PhiTransition::Down { site, phi } => {
324 crate::bus::InfraEvent::PeerDown { site, phi }
325 }
326 crate::framework::rtt_tracker::PhiTransition::Live { site } => {
327 crate::bus::InfraEvent::PeerLive { site }
328 }
329 };
330 self.bus.publish(crate::bus::NodeEvent::Infra(event));
331 }
332 }
333
334 if !budget_exceeded {
335 loop {
336 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
337 let step = self.invoke_one(op_ref, exec_id);
338 steps.push(step);
339 ops_invoked += 1;
340 if budget_hit(self.cycle_op_budget, ops_invoked) {
341 budget_exceeded = true;
342 break;
343 }
344 }
345 if self.maybe_complete_bootstrap() {
346 bootstrap_phases_completed += 1;
347 if budget_exceeded {
348 break;
349 }
350 if !self.seed_bootstrap_call() {
351 break;
352 }
353 } else {
354 break;
355 }
356 }
357 }
358
359 // For each phase queued by the host via
360 // `Engine::fire_lifecycle(phase)`, push every enrolled
361 // `LifecyclePhase` op onto the frontier with a fresh ExecId
362 // and emit a `LifecycleFired` step. Cascade-drain so newly
363 // pushed ops invoke in this same poll cycle.
364 let fired: Vec<String> = std::mem::take(&mut self.fired_phases);
365 for phase in &fired {
366 let op_refs: Vec<crate::ids::OpRef> =
367 self.lifecycle_table.get(phase).cloned().unwrap_or_default();
368 let pairs: Vec<(crate::ids::OpRef, crate::ids::ExecId)> = op_refs
369 .into_iter()
370 .map(|op_ref| (op_ref, self.allocate_exec_id()))
371 .collect();
372 for (op_ref, exec_id) in pairs {
373 self.exec.frontier.push_back((op_ref, exec_id));
374 }
375 steps.push(EngineStep::LifecycleFired {
376 phase: phase.clone(),
377 });
378 }
379 if !budget_exceeded {
380 loop {
381 while let Some((op_ref, exec_id)) = self.pop_frontier_fireable() {
382 let step = self.invoke_one(op_ref, exec_id);
383 steps.push(step);
384 ops_invoked += 1;
385 if budget_hit(self.cycle_op_budget, ops_invoked) {
386 budget_exceeded = true;
387 break;
388 }
389 }
390 if self.maybe_complete_bootstrap() {
391 bootstrap_phases_completed += 1;
392 if budget_exceeded {
393 break;
394 }
395 if !self.seed_bootstrap_call() {
396 break;
397 }
398 } else {
399 break;
400 }
401 }
402 }
403
404 let _phase8 = tracing::debug_span!("engine.phase8_outbound").entered();
405 for env in self.framework.outbound_queue.drain_all() {
406 steps.push(EngineStep::SendEnvelope(env));
407 }
408 // Surface peer-resolution failures captured by the wire
409 // syscall during this poll. Each entry becomes a dedicated
410 // EngineStep::PeerResolveFailed; the matching bus event was
411 // already published by the syscall, so subscribers got the
412 // routable mirror in real time.
413 for (peer, op_ref) in self.framework.pending_peer_resolve_failures.drain(..) {
414 steps.push(EngineStep::PeerResolveFailed {
415 peer,
416 op_ref,
417 exec_id: crate::ids::ExecId::from(0u64),
418 });
419 }
420 // Emit a single `OutboundDropped` step capturing FIFO drops
421 // that accumulated since the previous poll (e.g., when push
422 // exceeded `max_outbound_queue`).
423 let dropped = self.framework.outbound_queue.take_dropped_count();
424 if dropped > 0 {
425 steps.push(EngineStep::OutboundDropped { count: dropped });
426 }
427 // Surface a single `CycleBudgetExceeded` at the end so the
428 // host knows to re-poll. The step is appended after all
429 // observable per-op steps so the budget signal trails the
430 // in-cycle work it bounded.
431 if budget_exceeded {
432 steps.push(EngineStep::CycleBudgetExceeded { ops_invoked });
433 }
434 // Bootstrap state observable. Each queued bootstrap key that
435 // drained during this cycle emits its own `BootstrapComplete`
436 // — multi-target installs surface one signal per target in
437 // install order. `WaitingOnBootstrap` lands when the
438 // *currently* in-flight bootstrap op suspended on async
439 // completion and the host must drive the resumption before
440 // re-polling. Body-phase ops are gated from firing while
441 // `bootstrap_pending` is set.
442 if bootstrap_was_pending {
443 for _ in 0..bootstrap_phases_completed {
444 steps.push(EngineStep::BootstrapComplete);
445 }
446 if self.bootstrap.pending {
447 steps.push(EngineStep::WaitingOnBootstrap);
448 }
449 }
450 // Drain AppEmit / AppNotify syscall outputs into
451 // EngineStep::AppEvent. `Emit` carries serialized payload
452 // bytes; `Notify` is a marker-only event (empty `value_bytes`).
453 for ev in std::mem::take(&mut self.framework.pending_app_events) {
454 let (module_name, topic, value_bytes) = match ev {
455 crate::bus::AppEvent::Emit { name, value_bytes } => {
456 (String::new(), name, value_bytes)
457 }
458 crate::bus::AppEvent::Notify { name } => (String::new(), name, Vec::new()),
459 };
460 steps.push(EngineStep::AppEvent {
461 module_name,
462 topic,
463 value_bytes,
464 });
465 }
466
467 steps
468 }
469
470 /// Whether an ingress event would seed body-phase work. The
471 /// bootstrap gate requeues these while the bootstrap call is
472 /// outstanding so app-event delivery + envelope routing
473 /// observe the post-bootstrap engine state. Bootstrap-resuming
474 /// completions, transport failures, and host-injected timer
475 /// matures bypass the gate so the bootstrap call can progress.
476 fn is_body_phase_ingress(&self, event: &IngressEvent) -> bool {
477 matches!(
478 event,
479 IngressEvent::AppEvent { .. }
480 | IngressEvent::EnvelopeFrom { .. }
481 | IngressEvent::Invoke { .. }
482 )
483 }
484
485 /// Ingress-event router. Dispatches each event variant to its
486 /// handler: envelopes route to `deliver_inbound_internal`,
487 /// completions to `handle_completion`, app events to the bus,
488 /// matured timers to `handle_matured_timer`, invoke events to
489 /// the per-module entry point.
490 fn process_ingress_event(&mut self, event: IngressEvent) -> Vec<EngineStep> {
491 match event {
492 IngressEvent::Completion { cmd_id, results } => {
493 // The host's transport pre-decodes opaque payloads
494 // and hands them as `Vec<Vec<u8>>`. The engine wraps
495 // each entry as a `BytesValue` and forwards to
496 // `handle_completion`, which writes the slots and
497 // pushes downstream consumers.
498 let typed_results: Vec<(String, Box<dyn crate::slot_value::SlotValue>)> = results
499 .into_iter()
500 .enumerate()
501 .map(|(i, bytes)| {
502 let value = crate::syscall::values::BytesValue(bytes);
503 (
504 format!("out_{i}"),
505 Box::new(value) as Box<dyn crate::slot_value::SlotValue>,
506 )
507 })
508 .collect();
509 self.handle_completion(cmd_id, typed_results)
510 }
511 IngressEvent::EnvelopeFrom {
512 src_peer,
513 envelope,
514 src_observed_address,
515 } => {
516 // Backpressure pre-flight: silent-drop senders bypass
517 // dispatch; others route normally and then check the
518 // post-pop ingress depth against the high-water mark
519 // to emit a single BackoffNotice.
520 if self
521 .framework
522 .peer_state
523 .backpressure
524 .is_silent_drop_active(src_peer)
525 {
526 return Vec::new();
527 }
528 // Order: claimed (envelope) first so the entry
529 // exists for the observed-address registration step.
530 // Observed wins for NAT-translated cases because it
531 // appends a fresh address the claimed snapshot
532 // cannot know.
533 //
534 // The address-book hint is best-effort under allocator
535 // pressure: if the dedup buffer cannot be reserved we
536 // drop the hint, emit `WireReceiveError::AllocationFailed`,
537 // and continue routing — fills do not depend on the
538 // address book.
539 let mut steps = Vec::new();
540 if let Err(alloc) =
541 self.merge_src_peer_addresses(src_peer, &envelope.src_peer_addresses)
542 {
543 steps.push(self.emit_wire_receive_error(
544 Some(src_peer),
545 0,
546 0,
547 alloc.byte_count,
548 crate::bus::WireReceiveErrorKind::AllocationFailed {
549 byte_count: alloc.byte_count,
550 reason: alloc.reason,
551 },
552 ));
553 }
554 if let Some(observed) = src_observed_address {
555 self.merge_src_observed_address(src_peer, observed);
556 }
557 steps.extend(self.route_envelope(envelope, Some(src_peer)));
558 let backoff_steps = self.maybe_emit_backoff_notice(
559 src_peer,
560 crate::framework::BackoffCause::QueueFull,
561 None,
562 );
563 steps.extend(backoff_steps);
564 steps
565 }
566 IngressEvent::AppEvent {
567 module_name,
568 input_name,
569 value_bytes,
570 } => self.deliver_app_event(module_name, input_name, value_bytes),
571 IngressEvent::Invoke {
572 module_name,
573 inputs,
574 exec_id,
575 } => self.deliver_invoke(module_name, inputs, exec_id),
576 IngressEvent::TimerMatured { at_ns } => {
577 self.framework.scheduler.set_now(at_ns);
578 let matured = self.framework.scheduler.poll_matured(at_ns);
579 let mut out = Vec::new();
580 for kind in matured {
581 out.extend(self.handle_matured_timer(kind));
582 }
583 out
584 }
585 IngressEvent::CompletionFailed { cmd_id, detail } => {
586 // Async-completion FAILURE: route to the typed
587 // OpFailed path via handle_completion_failed so the
588 // parked op fails as itself rather than as a
589 // success-bytes payload.
590 self.handle_completion_failed(
591 cmd_id,
592 crate::bus::OpError {
593 detail,
594 ..Default::default()
595 },
596 )
597 }
598 IngressEvent::SendFailed {
599 wire_req_id,
600 peer: _peer,
601 reason: _reason,
602 } => {
603 // Transport-side delivery failure. Consumes the
604 // in-flight registration so the request tracker doesn't
605 // leak the entry; the parked originator op's failure is
606 // surfaced by the wire-timeout drain (`drain_stale`)
607 // rather than waiting for the TTL to elapse.
608 let now_ns = self.framework.scheduler.now_ns();
609 let _ = self
610 .framework
611 .request_tracker
612 .observe_response(wire_req_id, now_ns);
613 Vec::new()
614 }
615 IngressEvent::AppIngressError {
616 source,
617 byte_count,
618 kind,
619 } => {
620 // Cross-thread bridge for `CompletionSink::complete`
621 // exceeding the per-completion result cap. Re-publish
622 // on the bus so subscribers see the rejection
623 // alongside the synchronous emissions from
624 // `Node::deliver_event` / `Node::invoke`.
625 self.bus.publish(crate::bus::NodeEvent::Infra(
626 crate::bus::InfraEvent::AppIngressError {
627 source,
628 byte_count,
629 kind,
630 },
631 ));
632 Vec::new()
633 }
634 }
635 }
636
637 /// Deliver an inbound `AppEvent`: look up the addressed graph,
638 /// resolve the `input_name` to a `NodeSiteId`, wrap the bytes as
639 /// a `BytesValue`, seed the slot at a fresh `ExecId`, and push
640 /// ready downstream consumers onto the frontier. Surfaces an
641 /// observable `EngineStep::AppEvent` so the host can confirm
642 /// delivery even if no consumer exists yet.
643 fn deliver_app_event(
644 &mut self,
645 module_name: String,
646 input_name: String,
647 value_bytes: Vec<u8>,
648 ) -> Vec<EngineStep> {
649 let step = EngineStep::AppEvent {
650 module_name: module_name.clone(),
651 topic: input_name.clone(),
652 value_bytes: value_bytes.clone(),
653 };
654 let Some(graph) = self.graph(&module_name) else {
655 return vec![step];
656 };
657 let Some(&site) = graph.site_names.get(&input_name) else {
658 return vec![step];
659 };
660 let exec_id = self.allocate_exec_id();
661 let value = crate::syscall::values::BytesValue(value_bytes);
662 self.exec
663 .slot_table
664 .insert((site, exec_id), Some(Box::new(value)));
665 self.push_ready_consumers(&[site], exec_id);
666 vec![step]
667 }
668
669 /// Deliver an inbound `Invoke`: seed every `(input_name,
670 /// value_bytes)` pair into the addressed graph's matching site
671 /// at the supplied `exec_id`, then push the ready consumers
672 /// onto the frontier. Unknown modules / unknown input names are
673 /// silent no-ops (the host can detect via subsequent polls
674 /// producing no steps).
675 fn deliver_invoke(
676 &mut self,
677 module_name: String,
678 inputs: Vec<(String, Vec<u8>)>,
679 exec_id: crate::ids::ExecId,
680 ) -> Vec<EngineStep> {
681 let Some(graph) = self.graph(&module_name) else {
682 return Vec::new();
683 };
684 let mut seeded_sites: Vec<crate::ids::NodeSiteId> = Vec::new();
685 let pairs: Vec<(crate::ids::NodeSiteId, Vec<u8>)> = inputs
686 .into_iter()
687 .filter_map(|(name, bytes)| graph.site_names.get(&name).map(|&site| (site, bytes)))
688 .collect();
689 for (site, bytes) in pairs {
690 let value = crate::syscall::values::BytesValue(bytes);
691 self.exec
692 .slot_table
693 .insert((site, exec_id), Some(Box::new(value)));
694 seeded_sites.push(site);
695 }
696 if !seeded_sites.is_empty() {
697 self.push_ready_consumers(&seeded_sites, exec_id);
698 }
699 Vec::new()
700 }
701
702 /// Route a single inbound envelope. Iterates each `SlotFill` and
703 /// dispatches it via its multiaddr `dest_suffix` per
704 /// `docs/ADDRESSING.md`. The receiver doesn't consult any
705 /// subscription table or routing map - the address suffix is the
706 /// routing key. Two suffix shapes are supported:
707 /// - `/site/<NodeSiteId>` → data-plane slot fill.
708 /// - `/component/<cref>/op/<name>` → control-plane component
709 /// dispatch.
710 fn route_envelope(
711 &mut self,
712 env: crate::envelope::WireEnvelope,
713 src_peer: Option<crate::ids::PeerId>,
714 ) -> Vec<EngineStep> {
715 // Sender-side back-pressure ingest. Inbound envelopes whose
716 // first fill carries the reserved
717 // `BackoffNoticePayload` type-hash are framework-internal -
718 // intercept them here, decode the payload, advise the
719 // sender-side `BackoffTable`, and short-circuit the normal
720 // data-plane / control-plane delivery so user Components
721 // never observe a notice envelope.
722 if env
723 .fills
724 .first()
725 .is_some_and(|f| f.type_hash == crate::framework::backoff_notice_type_hash())
726 {
727 return self.ingest_backoff_notice(env, src_peer);
728 }
729
730 let correlation = env.correlation.as_ref().map(|c| c.wire_req_id).unwrap_or(0);
731
732 // hook. If the inbound envelope's wire_req_id matches an
733 // in-flight outbound round trip we registered at dispatch,
734 // pop the sample + feed it into the RTT tracker so the
735 // hierarchical-fallback EMAs converge on real per-edge,
736 // per-site, per-chain, and global RTT distributions.
737 let mut response_from_site: Option<crate::ids::NodeSiteId> = None;
738 if correlation != 0 {
739 let now_ns = self.framework.scheduler.now_ns();
740 if let Some(sample) = self
741 .framework
742 .request_tracker
743 .observe_response(correlation, now_ns)
744 {
745 self.framework.rtt_tracker.observe_round_trip(
746 sample.target_site,
747 sample.chain,
748 sample.elapsed_ns,
749 now_ns,
750 );
751 response_from_site = Some(sample.target_site);
752 }
753 }
754
755 // piggyback. The sender attached EdgeRttReport entries
756 // describing its observed outgoing edges in the chain. We
757 // record each report against the entry for the sending
758 // site so multi-hop chain budgets can compose from this
759 // direct neighbor's table.
760 if let Some(from_site) = response_from_site {
761 for report in &env.edge_rtt_reports {
762 self.framework.rtt_tracker.ingest_reported_outgoing(
763 from_site,
764 crate::ids::NodeSiteId::from(report.next_hop_site_id),
765 report.chain_id,
766 report.srtt_ns,
767 report.rttvar_ns,
768 report.sample_count,
769 );
770 }
771 }
772
773 // Capture the inbound envelope's deadline budget + arrival
774 // timestamp so consumer ops (especially `wire.Send` while
775 // forwarding) can propagate them per Dapper.
776 let inbound_remaining_deadline_ns = if env.remaining_deadline_ns > 0 {
777 Some(env.remaining_deadline_ns)
778 } else {
779 None
780 };
781 let arrival_ns = self.framework.scheduler.now_ns();
782
783 let mut steps = Vec::new();
784 for (fill_index, fill) in env.fills.into_iter().enumerate() {
785 steps.extend(self.deliver_fill(
786 fill,
787 fill_index as u32,
788 correlation,
789 src_peer,
790 arrival_ns,
791 inbound_remaining_deadline_ns,
792 ));
793 }
794 steps
795 }
796
797 /// Dispatch one `SlotFill` per `docs/ADDRESSING.md`. Parses
798 /// `fill.dest_suffix` as a multiaddr and routes by the trailing
799 /// segment shape. `fill_index` is the fill's 0-based position
800 /// within the inbound envelope; surfaces on per-fill failure
801 /// events so subscribers can identify the failing fill when
802 /// the envelope partial-delivers.
803 fn deliver_fill(
804 &mut self,
805 fill: crate::envelope::SlotFill,
806 fill_index: u32,
807 wire_req_id: u64,
808 src_peer: Option<crate::ids::PeerId>,
809 arrival_ns: u64,
810 inbound_remaining_deadline_ns: Option<u64>,
811 ) -> Vec<EngineStep> {
812 let addr = match crate::framework::Address::from_bytes(&fill.dest_suffix) {
813 Ok(a) => a,
814 Err(e) => {
815 return vec![self.emit_wire_decode_failure(
816 0,
817 fill.payload.len(),
818 format!("dest_suffix parse: {e}"),
819 )];
820 }
821 };
822
823 // Data-plane suffix: /site/<NodeSiteId>. The Site segment
824 // uniquely identifies the slot (NodeSiteIds are globally
825 // unique within a Node).
826 if let Some(site_id) = addr.site_id() {
827 return self.deliver_data_plane_fill(
828 site_id,
829 fill,
830 fill_index,
831 src_peer,
832 wire_req_id,
833 arrival_ns,
834 inbound_remaining_deadline_ns,
835 );
836 }
837
838 // Control-plane suffix: /component/<cref>/op/<name>.
839 if let (Some(cref), Some(op_name)) = (addr.component_ref(), addr.op_name()) {
840 let op_name = op_name.to_string();
841 return self.deliver_control_plane_fill(cref, op_name, fill, wire_req_id);
842 }
843
844 vec![self.emit_wire_decode_failure(
845 0,
846 fill.payload.len(),
847 "address shape neither data-plane nor control-plane".to_string(),
848 )]
849 }
850
851 /// Publish a `WireDecodeFailure` onto the bus and return the
852 /// matching `EngineStep`. The bus event lets in-process
853 /// telemetry subscribers react; the EngineStep surfaces the
854 /// same context to the host poll() caller.
855 fn emit_wire_decode_failure(
856 &mut self,
857 hash: u64,
858 payload_size: usize,
859 detail: String,
860 ) -> EngineStep {
861 self.bus.publish(crate::bus::NodeEvent::Infra(
862 crate::bus::InfraEvent::WireDecodeFailure {
863 hash,
864 payload_size,
865 detail: detail.clone(),
866 },
867 ));
868 EngineStep::WireDecodeFailed {
869 hash,
870 payload_size,
871 detail,
872 }
873 }
874
875 /// Data-plane delivery: decode the fill payload into a typed
876 /// `SlotValue` via the wire decoder registry, write it into the
877 /// addressed slot at a fresh `ExecId`, and push the slot's
878 /// downstream consumers onto the frontier. Walks each installed
879 /// graph's `consumers` map for the matching `NodeSiteId`.
880 ///
881 /// Per-fill failures (unknown type-hash, type mismatch against
882 /// the slot's declared wire type, decoder error) surface as a
883 /// `WireReceiveError` InfraEvent + matching `WireReceiveFailed`
884 /// EngineStep. The failing fill drops; sibling fills in the
885 /// same envelope still deliver (the caller continues iterating).
886 #[allow(clippy::too_many_arguments)]
887 fn deliver_data_plane_fill(
888 &mut self,
889 site_id: crate::ids::NodeSiteId,
890 mut fill: crate::envelope::SlotFill,
891 fill_index: u32,
892 src_peer: Option<crate::ids::PeerId>,
893 wire_req_id: u64,
894 arrival_ns: u64,
895 inbound_remaining_deadline_ns: Option<u64>,
896 ) -> Vec<EngineStep> {
897 // Resolve the consumer ops from each installed graph; a
898 // NodeSiteId belongs to at most one graph because IDs are
899 // globally unique, but we tolerate empty lookups.
900 let consumers: Vec<crate::ids::OpRef> = self
901 .graphs_iter()
902 .filter_map(|g| g.consumers.get(&site_id).cloned())
903 .flatten()
904 .collect();
905
906 // Resolve the typed `SlotValue` BEFORE allocating an
907 // ExecId or stamping inbound context: failure modes return
908 // a WireReceiveError step and the envelope's other fills
909 // continue to deliver without polluting the slot table.
910 // Trigger fills bypass the decoder lookup entirely.
911 let value: Box<dyn crate::slot_value::SlotValue> = if fill.trigger_only {
912 Box::new(crate::syscall::values::TriggerValue)
913 } else {
914 match self.decode_typed_fill(&mut fill, fill_index, site_id, src_peer) {
915 Ok(v) => v,
916 Err(step) => return vec![step],
917 }
918 };
919
920 let exec_id = self.allocate_exec_id();
921 // Stamp the inbound envelope context for this ExecId.
922 // Components access this through `RuntimeResourceRef` (RX
923 // gates filter on `src_peer`; `wire.Send` forwarding inside a
924 // chain reuses `wire_req_id` + propagates `remaining_deadline_ns`
925 // minus elapsed local time).
926 self.framework.inbound_contexts.insert(
927 exec_id,
928 crate::framework::InboundContext {
929 src_peer,
930 wire_req_id: if wire_req_id != 0 {
931 Some(wire_req_id)
932 } else {
933 None
934 },
935 arrival_ns: Some(arrival_ns),
936 remaining_deadline_ns: inbound_remaining_deadline_ns,
937 },
938 );
939 // `slot_write` routes through the engine's budget-release
940 // bookkeeping so a wire-receive overwriting a prior carrier
941 // releases the prior `charged_bytes()` against
942 // `ingress_bytes_in_flight`. Fresh-slot writes (the common
943 // case here, since `exec_id` is freshly allocated) hit the
944 // no-prior branch and incur the same cost as the raw
945 // `slot_table.insert`.
946 self.slot_write(site_id, exec_id, value);
947
948 // If site_id is a wire.Recv's payload site, also populate the
949 // paired sender site with PeerIdValue(src_peer) for the same
950 // ExecId. Downstream user ops read this as a graph value to
951 // identify provenance; reply-to is `g.net_out(name, sender, reply)`.
952 let sender_site: Option<crate::ids::NodeSiteId> = self
953 .graphs_iter()
954 .find_map(|g| g.recv_sender_sites.get(&site_id).copied());
955 if let (Some(sender_site), Some(peer)) = (sender_site, src_peer) {
956 let sender_value: Box<dyn crate::slot_value::SlotValue> =
957 Box::new(crate::syscall::values::PeerIdValue(peer));
958 self.exec
959 .slot_table
960 .insert((sender_site, exec_id), Some(sender_value));
961 }
962
963 for op_ref in consumers {
964 self.exec.frontier.push_back((op_ref, exec_id));
965 }
966 Vec::new()
967 }
968
969 /// Resolve a non-trigger data-plane fill into its typed
970 /// `SlotValue` carrier. The routing tree branches on the
971 /// destination slot's binding:
972 ///
973 /// - **Backend-bound slot** — the engine takes ownership of
974 /// `fill.payload` via `std::mem::take` (zero-copy ownership
975 /// transfer; `fill.payload` is already framework-owned from
976 /// envelope decode) and hands it to the bound backend's
977 /// `materialize_from_wire`. The typed tensor lands inside a
978 /// `BackendTensorCarrier` whose `charged_bytes` + `backend_ref`
979 /// are stamped to the engine-side accounting before the carrier
980 /// is returned for slot-table install.
981 /// - **Framework-carrier slot** — the wire decoder registry
982 /// resolves the `type_hash` to a bincode decoder; the decoded
983 /// `SlotValue` rides on as-is.
984 ///
985 /// Failure modes surface as typed `WireReceiveError` InfraEvents
986 /// + matching `WireReceiveFailed` EngineSteps:
987 ///
988 /// - **TypeMismatch** — destination slot declares an expected
989 /// wire-type hash via `GraphSlot::recv_wire_type_hash` and
990 /// the fill's `type_hash` does not match. Checked first so a
991 /// mis-typed payload never reaches the decoder.
992 /// - **UnknownTypeHash** — framework-carrier path only; no
993 /// decoder is registered for the stamped hash.
994 /// - **DecodeFailed** — registered decoder ran and rejected
995 /// the bytes (framework-carrier path).
996 /// - **BudgetExceeded** — admitting the bytes would push the
997 /// engine over `NodeConfig::ingress_byte_budget`.
998 /// - **BackendMaterializeFailed** — the bound backend's typed
999 /// `materialize_from_wire` returned `Err` (backend path).
1000 ///
1001 /// The `Err` carries the typed `EngineStep` the caller will
1002 /// surface to `Engine::poll`'s return value. `EngineStep` is
1003 /// load-bearing for the host's failure visibility surface, so
1004 /// boxing it here would force every caller through an
1005 /// indirection layer that adds no clarity for a single
1006 /// internal call site.
1007 ///
1008 /// **No per-fill scratch buffer.** Backend-mediated tensor fills
1009 /// move `fill.payload` into `materialize_from_wire` via
1010 /// `std::mem::take`; the empty `Vec<u8>` left behind drops with
1011 /// the `SlotFill`. The only memcpy on the tensor path is the
1012 /// one the backend chooses to do — or skips via zero-copy
1013 /// adoption (`ArrayD::from_shape_vec` when alignment permits).
1014 #[allow(clippy::result_large_err)]
1015 fn decode_typed_fill(
1016 &mut self,
1017 fill: &mut crate::envelope::SlotFill,
1018 fill_index: u32,
1019 site_id: crate::ids::NodeSiteId,
1020 src_peer: Option<crate::ids::PeerId>,
1021 ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1022 let expected_hash: Option<u64> = self
1023 .graphs_iter()
1024 .find_map(|g| g.recv_wire_type_hash.get(&site_id).copied());
1025 if let Some(expected) = expected_hash {
1026 if expected != fill.type_hash {
1027 return Err(self.emit_wire_receive_error(
1028 src_peer,
1029 fill_index,
1030 fill.type_hash,
1031 fill.payload.len(),
1032 crate::bus::WireReceiveErrorKind::TypeMismatch {
1033 expected_hash: expected,
1034 },
1035 ));
1036 }
1037 }
1038
1039 // Resolve the destination slot's binding so the backend-bound
1040 // branch can fire before the framework-carrier registry
1041 // lookup. A site without a recv_site → slot_id mapping (no
1042 // role consumes the Recv payload) takes the framework path.
1043 let slot_id = self
1044 .graphs_iter()
1045 .find_map(|g| g.recv_site_to_slot_id.get(&site_id).copied());
1046 let role_ref = slot_id.and_then(|id| self.role_ref_for_slot_id(id));
1047
1048 // Budget guard (uniform across branches): pre-charge the
1049 // fill's payload length against `NodeConfig::ingress_byte_budget`
1050 // before invoking either decoder. Successful admission leaves
1051 // the charge in the counter; failure paths release before
1052 // returning.
1053 let byte_count = fill.payload.len();
1054 if let Err(reason) = self.try_charge(byte_count) {
1055 return Err(self.emit_wire_receive_error(
1056 src_peer,
1057 fill_index,
1058 fill.type_hash,
1059 byte_count,
1060 crate::bus::WireReceiveErrorKind::BudgetExceeded {
1061 byte_count: reason.byte_count,
1062 budget_remaining: reason.budget_remaining,
1063 },
1064 ));
1065 }
1066
1067 if let Some((crate::registry::ComponentRole::Backend, backend_ref)) = role_ref {
1068 return self.materialize_via_backend(
1069 fill,
1070 fill_index,
1071 src_peer,
1072 byte_count,
1073 backend_ref,
1074 );
1075 }
1076
1077 let Some(decoder) = crate::slot_value::wire_decoder_registry()
1078 .get(&fill.type_hash)
1079 .copied()
1080 else {
1081 self.release(byte_count);
1082 return Err(self.emit_wire_receive_error(
1083 src_peer,
1084 fill_index,
1085 fill.type_hash,
1086 fill.payload.len(),
1087 crate::bus::WireReceiveErrorKind::UnknownTypeHash,
1088 ));
1089 };
1090 decoder(&fill.payload).map_err(|e| {
1091 self.release(byte_count);
1092 self.emit_wire_receive_error(
1093 src_peer,
1094 fill_index,
1095 fill.type_hash,
1096 byte_count,
1097 crate::bus::WireReceiveErrorKind::DecodeFailed {
1098 error_summary: e.to_string(),
1099 },
1100 )
1101 })
1102 }
1103
1104 /// Backend-bound branch of [`Self::decode_typed_fill`]. Hands the
1105 /// inbound bytes to the bound backend via the role-dispatch
1106 /// registry; wraps the typed `Self::Tensor` in a
1107 /// `BackendTensorCarrier` whose engine-side accounting fields
1108 /// (`charged_bytes`, `backend_ref`) are stamped before the
1109 /// carrier is returned. On error releases the byte charge and
1110 /// emits `BackendMaterializeFailed`.
1111 #[allow(clippy::result_large_err)]
1112 fn materialize_via_backend(
1113 &mut self,
1114 fill: &mut crate::envelope::SlotFill,
1115 fill_index: u32,
1116 src_peer: Option<crate::ids::PeerId>,
1117 byte_count: usize,
1118 backend_ref: crate::ids::ComponentRef,
1119 ) -> Result<Box<dyn crate::slot_value::SlotValue>, EngineStep> {
1120 // `mem::take` transfers ownership of `fill.payload` to the
1121 // backend at zero cost: the wire bytes are already
1122 // framework-owned (prost allocated them during envelope
1123 // decode), and the empty `Vec<u8>` left in `fill.payload`
1124 // drops with the `SlotFill`. No scratch buffer, no memcpy on
1125 // the framework side.
1126 let bytes = std::mem::take(&mut fill.payload);
1127 let type_hash = fill.type_hash;
1128
1129 // Take the backend component out of the Vec so dispatch can
1130 // borrow it without holding a long lease on `engine.components`.
1131 // Restore on the way out — even on error paths.
1132 let Some(mut taken) = self.take_component(backend_ref) else {
1133 self.release(byte_count);
1134 return Err(self.emit_wire_receive_error(
1135 src_peer,
1136 fill_index,
1137 type_hash,
1138 byte_count,
1139 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1140 backend_ref,
1141 backend_error_summary: "backend component slot empty".to_string(),
1142 },
1143 ));
1144 };
1145
1146 // Reach the backend through the per-T dispatcher in
1147 // `role_dispatchers`. The dispatcher closes over the typed
1148 // `Self::Tensor` so the boxed `SlotValue` returned here is
1149 // already a `BackendTensorCarrier` (the derive bridge does
1150 // the wrap).
1151 let any: &mut dyn std::any::Any = taken.as_mut();
1152 let tid = (*any).type_id();
1153 let dispatcher = self.role_dispatchers.get(&tid).map(|d| d.materialize);
1154
1155 let result = if let Some(materialize) = dispatcher {
1156 (materialize)(any, type_hash, bytes)
1157 } else {
1158 Err(crate::slot_value::BackendMaterializeError {
1159 summary: "no BackendRuntime dispatcher registered".to_string(),
1160 })
1161 };
1162
1163 self.restore_component(backend_ref, taken);
1164
1165 match result {
1166 Ok(boxed) => {
1167 // Downcast to `BackendTensorCarrier` and stamp the
1168 // engine-side accounting fields the derive bridge left
1169 // as placeholders. The bridge constructs the carrier
1170 // with the typed clone / encode fn pointers; the
1171 // engine owns the budget counter and the backend
1172 // identity, so it fills them in here. The downcast
1173 // is infallible by the bridge's construction; any
1174 // hand-rolled `BackendRuntime::materialize_from_wire`
1175 // that returns a non-carrier `SlotValue` flows
1176 // through unchanged (no accounting stamp), which is
1177 // the right behaviour because non-carrier returns
1178 // never charge against the backend-tensor pool.
1179 let any_box = boxed.into_any_boxed();
1180 let final_boxed: Box<dyn crate::slot_value::SlotValue> =
1181 match any_box.downcast::<crate::slot_value::BackendTensorCarrier>() {
1182 Ok(mut carrier) => {
1183 carrier.charged_bytes = byte_count;
1184 carrier.backend_ref = backend_ref;
1185 carrier
1186 }
1187 Err(other) => {
1188 // The dispatcher returned a non-carrier
1189 // `SlotValue`; route it through unchanged.
1190 // `Box<dyn Any + Send + Sync>` downcasts
1191 // back to `Box<dyn SlotValue>` only via a
1192 // typed re-box, which we don't do — log
1193 // and release the budget charge instead.
1194 let _ = other;
1195 self.release(byte_count);
1196 return Err(self.emit_wire_receive_error(
1197 src_peer,
1198 fill_index,
1199 type_hash,
1200 byte_count,
1201 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1202 backend_ref,
1203 backend_error_summary:
1204 "backend bridge returned non-carrier SlotValue".to_string(),
1205 },
1206 ));
1207 }
1208 };
1209 Ok(final_boxed)
1210 }
1211 Err(e) => {
1212 self.release(byte_count);
1213 Err(self.emit_wire_receive_error(
1214 src_peer,
1215 fill_index,
1216 type_hash,
1217 byte_count,
1218 crate::bus::WireReceiveErrorKind::BackendMaterializeFailed {
1219 backend_ref,
1220 backend_error_summary: e.summary,
1221 },
1222 ))
1223 }
1224 }
1225 }
1226
1227 /// Publish a `WireReceiveError` on the bus and return the
1228 /// matching `EngineStep`. Mirrors `emit_wire_decode_failure`
1229 /// for the per-fill typed-decode failure surface.
1230 fn emit_wire_receive_error(
1231 &mut self,
1232 src_peer: Option<crate::ids::PeerId>,
1233 fill_index: u32,
1234 actual_hash: u64,
1235 payload_size: usize,
1236 kind: crate::bus::WireReceiveErrorKind,
1237 ) -> EngineStep {
1238 self.bus.publish(crate::bus::NodeEvent::Infra(
1239 crate::bus::InfraEvent::WireReceiveError {
1240 src_peer,
1241 fill_index,
1242 actual_hash,
1243 payload_size,
1244 kind: kind.clone(),
1245 },
1246 ));
1247 EngineStep::WireReceiveFailed {
1248 src_peer,
1249 fill_index,
1250 actual_hash,
1251 payload_size,
1252 kind,
1253 }
1254 }
1255
1256 /// Control-plane delivery: invoke
1257 /// `component[cref].dispatch_atomic(op_name, [(payload,
1258 /// correlation, ...)], ctx)`. The component decodes the payload
1259 /// bytes via its own protocol logic.
1260 fn deliver_control_plane_fill(
1261 &mut self,
1262 component_ref: crate::ids::ComponentRef,
1263 op_name: String,
1264 fill: crate::envelope::SlotFill,
1265 wire_req_id: u64,
1266 ) -> Vec<EngineStep> {
1267 let payload = BytesValue(fill.payload);
1268 let correlation = WireReqIdValue(wire_req_id);
1269
1270 let inputs_storage: Vec<(String, Box<dyn SlotValue>)> = vec![
1271 ("payload".to_string(), Box::new(payload)),
1272 ("correlation".to_string(), Box::new(correlation)),
1273 ];
1274
1275 let inputs_for_dispatch: Vec<(&str, &dyn SlotValue)> = inputs_storage
1276 .iter()
1277 .map(|(n, h)| (n.as_str(), h.as_ref()))
1278 .collect();
1279
1280 // D2 take-and-restore so we can split-borrow the rest of
1281 // `self.framework` / `self.bus` etc. while the dispatching
1282 // component is held exclusively.
1283 let Some(mut taken) = self.take_component(component_ref) else {
1284 return Vec::new();
1285 };
1286
1287 let mut ctx = crate::runtime::RuntimeResourceRef {
1288 peers: crate::runtime::PeerCtx {
1289 gate: &mut self.framework.peer_state.gate,
1290 backoff: &mut self.framework.peer_state.backoff,
1291 governor: &mut self.framework.peer_state.governor,
1292 addresses: &mut self.framework.address_book,
1293 backpressure: &mut self.framework.peer_state.backpressure,
1294 },
1295 net: crate::runtime::NetCtx {
1296 outbound: &mut self.framework.outbound_queue,
1297 rtt: &mut self.framework.rtt_tracker,
1298 requests: &mut self.framework.request_tracker,
1299 dedup: &mut self.framework.inbound_dedup,
1300 pending_peer_resolve_failures: &mut self.framework.pending_peer_resolve_failures,
1301 },
1302 time: crate::runtime::TimeCtx {
1303 scheduler: &mut self.framework.scheduler,
1304 },
1305 syscall: crate::runtime::SyscallCtx {
1306 serialize_queue: &mut self.framework.serialize_queue,
1307 hold_table: &mut self.framework.hold_table,
1308 record_buffer: &mut self.framework.record_buffer,
1309 event_source: &mut self.framework.event_source,
1310 counters: &mut self.framework.counters,
1311 any_fired_groups: &mut self.framework.any_fired_groups,
1312 deadline_match_fired: &mut self.framework.deadline_match_fired,
1313 rng: &mut *self.framework.rng,
1314 pending_app_events: &mut self.framework.pending_app_events,
1315 },
1316 bus: &mut self.bus,
1317 ingress: std::sync::Arc::clone(&self.ingress),
1318 components: crate::runtime::ComponentsView {
1319 instances: Some(&self.components),
1320 slots: Some(&self.slots),
1321 },
1322 current: crate::runtime::CurrentCallCtx {
1323 op_ref: crate::ids::OpRef::from(0u64),
1324 exec_id: crate::ids::ExecId::from(0u64),
1325 self_peer: self.self_peer,
1326 node_attributes: &[],
1327 node_metadata: &[],
1328 inbound: crate::runtime::InboundCtx {
1329 src_peer: None,
1330 wire_req_id: None,
1331 arrival_ns: None,
1332 remaining_deadline_ns: None,
1333 },
1334 pending_completions: Vec::new(),
1335 next_command_id: &mut self.exec.ids.next_command_id,
1336 },
1337 };
1338
1339 let _ = crate::engine::invoke::call_protocol_dispatch_atomic(
1340 taken.as_mut(),
1341 &op_name,
1342 &inputs_for_dispatch,
1343 &mut ctx,
1344 &self.role_dispatchers,
1345 );
1346 let captured = std::mem::take(&mut ctx.current.pending_completions);
1347 drop(ctx);
1348 self.exec.pending_completions.extend(captured);
1349
1350 self.restore_component(component_ref, taken);
1351
1352 Vec::new()
1353 }
1354
1355 /// Route a matured timer to its consumer.
1356 /// - `Sleep`/`Completion` fulfil a pending `CommandId`.
1357 /// - `Interval` re-pushes its owning Op onto the frontier at a
1358 /// fresh `ExecId`; the Op's `invoke` re-schedules the next
1359 /// firing and emits the periodic `TriggerValue` downstream.
1360 /// - `After` fulfils the parked `CommandId` (which the Op
1361 /// suspended on) with a single `TriggerValue`.
1362 fn handle_matured_timer(
1363 &mut self,
1364 kind: crate::framework::scheduler::TimerKind,
1365 ) -> Vec<EngineStep> {
1366 match kind {
1367 TimerKind::Sleep(cmd_id) | TimerKind::Completion(cmd_id) => {
1368 self.handle_completion(cmd_id, Vec::new())
1369 }
1370 TimerKind::Interval { key, .. } => {
1371 let op_ref = crate::ids::OpRef::from(key);
1372 let exec_id = self.allocate_exec_id();
1373 self.exec.frontier.push_back((op_ref, exec_id));
1374 Vec::new()
1375 }
1376 TimerKind::After { key } => {
1377 let cmd_id = CommandId::from(key);
1378 let value: Box<dyn SlotValue> = Box::new(crate::syscall::values::TriggerValue);
1379 self.handle_completion(cmd_id, vec![("trigger".to_string(), value)])
1380 }
1381 }
1382 }
1383
1384 /// Merge a sender-claimed `src_peer_addresses` list into the
1385 /// receiver's `AddressBook` entry for `src_peer`. Empty list is
1386 /// a no-op (the sender chose not to advertise). The
1387 /// skip-on-unchanged guard compares the decoded list to the
1388 /// existing entry via slice equality and elides the write when
1389 /// they match — without this the receiver would rewrite the
1390 /// entry once per envelope, swamping the address book with
1391 /// idempotent updates under load.
1392 ///
1393 /// Returns `Err(SrcAddressMergeAllocError)` when the dedup
1394 /// buffer (S4) or the address-book peer dedup (S5) cannot
1395 /// reserve. The caller surfaces this as
1396 /// `WireReceiveError::AllocationFailed`; the address-book hint
1397 /// is best-effort under allocator pressure (the envelope's
1398 /// other fills still route).
1399 fn merge_src_peer_addresses(
1400 &mut self,
1401 src_peer: crate::ids::PeerId,
1402 claimed_bytes: &[Vec<u8>],
1403 ) -> Result<(), SrcAddressMergeAllocError> {
1404 if claimed_bytes.is_empty() {
1405 return Ok(());
1406 }
1407 // S4: dedup buffer for parsed Addresses. Use try_reserve_exact
1408 // so an exhausted allocator surfaces as AllocationFailed
1409 // rather than aborting the receiver.
1410 let mut claimed: Vec<crate::framework::Address> = Vec::new();
1411 let claim_count = claimed_bytes.len();
1412 if crate::fallible::try_reserve_exact(&mut claimed, claim_count).is_err() {
1413 return Err(SrcAddressMergeAllocError {
1414 byte_count: claim_count
1415 .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1416 reason: crate::bus::AllocFailReason::HeapExhausted,
1417 });
1418 }
1419 for bytes in claimed_bytes {
1420 match crate::framework::Address::from_bytes(bytes) {
1421 Ok(addr) => claimed.push(addr),
1422 // Parse failure on one segment drops the hint without
1423 // touching the address book; the envelope's fills still
1424 // route via their own dest_suffix parsing.
1425 Err(_) => return Ok(()),
1426 }
1427 }
1428 if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1429 if existing == claimed.as_slice() {
1430 return Ok(());
1431 }
1432 }
1433 // S5: `add_peer` runs its own try_reserve_exact for the
1434 // peer-entry dedup buffer. Allocation failure surfaces here
1435 // as AddressBookError::AllocationFailed; map to the same
1436 // WireReceiveErrorKind::AllocationFailed. Other errors
1437 // (Full, EmptyAddressList) are deployment-level signals
1438 // already observable elsewhere — swallow as before.
1439 match self.framework.address_book.add_peer(src_peer, claimed) {
1440 Ok(()) => Ok(()),
1441 Err(crate::framework::AddressBookError::AllocationFailed { requested }) => {
1442 Err(SrcAddressMergeAllocError {
1443 byte_count: requested
1444 .saturating_mul(std::mem::size_of::<crate::framework::Address>()),
1445 reason: crate::bus::AllocFailReason::HeapExhausted,
1446 })
1447 }
1448 Err(_) => Ok(()),
1449 }
1450 }
1451
1452 /// Merge a transport-observed source address into the
1453 /// receiver's `AddressBook` entry for `src_peer`. Skips the
1454 /// write when the entry already contains `addr` (slice
1455 /// containment check) so a steady stream of envelopes from the
1456 /// same observed endpoint costs at most one `register_address`
1457 /// call. When no entry exists yet — the sender-claimed merge
1458 /// upstream may have short-circuited on an empty list — the
1459 /// observed address bootstraps a fresh one via `add_peer`.
1460 fn merge_src_observed_address(
1461 &mut self,
1462 src_peer: crate::ids::PeerId,
1463 addr: crate::framework::Address,
1464 ) {
1465 if let Some(existing) = self.framework.address_book.lookup(src_peer) {
1466 if existing.contains(&addr) {
1467 return;
1468 }
1469 let _ = self.framework.address_book.register_address(src_peer, addr);
1470 return;
1471 }
1472 let _ = self.framework.address_book.add_peer(src_peer, vec![addr]);
1473 }
1474
1475 /// Sender-side ingest of a `BackoffNotice` envelope. Called from
1476 /// `route_envelope` when the first fill's `type_hash` matches
1477 /// `backoff_notice_type_hash`. Decodes the payload, applies the
1478 /// remote-advised back-off via `BackoffTable::record_remote_advisory`,
1479 /// and records a `PeerGovernor::record_failure` so the existing
1480 /// 5-failure `LifecycleTransition::WentDown` path stays the single
1481 /// peer-down decision site. Returns no `EngineStep`s — the notice
1482 /// never reaches a user Component.
1483 ///
1484 /// When the source peer is unknown (the inbound `EnvelopeFrom`
1485 /// was synthesised by a transport that couldn't attribute the
1486 /// sender), the engine drops the notice silently. When the
1487 /// payload bytes fail to decode, it emits a
1488 /// `WireDecodeFailure` so operators observe the corruption.
1489 fn ingest_backoff_notice(
1490 &mut self,
1491 env: crate::envelope::WireEnvelope,
1492 src_peer: Option<crate::ids::PeerId>,
1493 ) -> Vec<EngineStep> {
1494 let Some(fill) = env.fills.into_iter().next() else {
1495 return Vec::new();
1496 };
1497 let Some(src_peer) = src_peer else {
1498 // No attributable sender; drop silently.
1499 return Vec::new();
1500 };
1501 let Some(payload) = crate::framework::BackoffNoticePayload::decode(&fill.payload) else {
1502 return vec![self.emit_wire_decode_failure(
1503 fill.type_hash,
1504 fill.payload.len(),
1505 "BackoffNoticePayload bincode decode failed".to_string(),
1506 )];
1507 };
1508
1509 let now_ns = self.framework.scheduler.now_ns();
1510 // Advise the sender-side BackoffTable using the receiver's
1511 // quoted delay (§5.2). The existing BackoffGateTx already
1512 // reads `should_retry(peer, now_ns)` and respects the new
1513 // `next_retry_ns`, so no new gate is needed.
1514 self.framework.peer_state.backoff.record_remote_advisory(
1515 src_peer,
1516 now_ns,
1517 payload.min_backoff_ns,
1518 );
1519 // Record a peer-governor failure so the existing 5-failure
1520 // `LifecycleTransition::WentDown` surfacing remains the
1521 // single down-decision path (§5.3).
1522 let transition = self
1523 .framework
1524 .peer_state
1525 .governor
1526 .record_failure(src_peer, now_ns);
1527 // Surface the WentDown lifecycle transition if the receipt
1528 // of this notice pushed the sender's local view of the peer
1529 // across the threshold. The bus event mirrors the existing
1530 // PeerSuspect/PeerDown surfacing path.
1531 if matches!(transition, crate::framework::LifecycleTransition::WentDown,) {
1532 // The existing PhiTransition path emits PeerDown by site;
1533 // here the trigger is the per-peer governor decision so
1534 // no site-level info is available. Telemetry on the
1535 // tracker entry remains via PeerHealth.
1536 }
1537 // Cause is informational on the sender side - it's already
1538 // logged at the receiver via InfraEvent::BackoffNoticeSent.
1539 let _ = payload.cause();
1540 Vec::new()
1541 }
1542
1543 /// Receiver-side back-pressure hook. When the ingress depth
1544 /// crosses the high-water mark (or
1545 /// the caller forces emission via `force = true` from the
1546 /// φ-accrual scan), consult the `BackpressureTracker` for the
1547 /// `src_peer` and - on `Decision::EmitNotice` - mint a
1548 /// `BackoffNotice` envelope back to the sender, push it onto
1549 /// `OutboundQueue`, and publish the matching
1550 /// `InfraEvent::BackoffNoticeSent`. The K-then-silent transition
1551 /// surfaces `InfraEvent::SilentDropActive` as well so operators
1552 /// see the fallback engage. Returns the resulting `EngineStep`s;
1553 /// the caller appends them to the polling step list.
1554 fn maybe_emit_backoff_notice(
1555 &mut self,
1556 src_peer: crate::ids::PeerId,
1557 cause: crate::framework::BackoffCause,
1558 hint_ns: Option<u64>,
1559 ) -> Vec<EngineStep> {
1560 // Pull config-driven mark; PhiAccrual + ExplicitDrop callers
1561 // bypass the queue-depth check because they were triggered
1562 // by an external signal (φ flip / Component reject).
1563 let force = !matches!(cause, crate::framework::BackoffCause::QueueFull);
1564 if !force {
1565 // Compare the pre-drain ingress-depth snapshot to the
1566 // configured high-water fraction of capacity. The
1567 // snapshot stays valid across every
1568 // `process_ingress_event` call inside the same poll
1569 // cycle — using the current `ingress.len()` would see
1570 // post-drain zero and never trip.
1571 let len = self.phase1_pre_drain_depth;
1572 let cap = self.ingress.capacity();
1573 if !self
1574 .framework
1575 .peer_state
1576 .backpressure
1577 .is_over_high_water(len, cap)
1578 {
1579 return Vec::new();
1580 }
1581 }
1582
1583 let now_ns = self.framework.scheduler.now_ns();
1584 // `hint_ns` for QueueFull is sized by the configured
1585 // min-notice interval (the BackpressureTracker enforces the
1586 // floor). PhiAccrual callers may pass a specific mean
1587 // inter-arrival hint; ExplicitDrop callers pass the
1588 // Component-supplied `BackpressureHint`.
1589 let min_hint = hint_ns.unwrap_or(0);
1590 let decision = self
1591 .framework
1592 .peer_state
1593 .backpressure
1594 .observe_overload(src_peer, cause, min_hint, now_ns);
1595
1596 let (min_backoff_ns, cause_chosen) = match decision {
1597 crate::framework::BackpressureDecision::EmitNotice {
1598 min_backoff_ns,
1599 cause,
1600 } => (min_backoff_ns, cause),
1601 crate::framework::BackpressureDecision::Suppress
1602 | crate::framework::BackpressureDecision::SilentDrop => return Vec::new(),
1603 };
1604
1605 // Build the notice envelope + push it on the outbound queue.
1606 let payload =
1607 crate::framework::BackoffNoticePayload::new(min_backoff_ns, cause_chosen, None);
1608 let envelope =
1609 crate::framework::build_backoff_notice_envelope(self.self_peer, src_peer, payload);
1610 self.framework.outbound_queue.push(envelope);
1611
1612 // Bus event for ops dashboards + Component authors that
1613 // want to react to local overload signals.
1614 self.bus.publish(crate::bus::NodeEvent::Infra(
1615 crate::bus::InfraEvent::BackoffNoticeSent {
1616 peer: src_peer,
1617 cause: cause_chosen,
1618 min_backoff_ns,
1619 },
1620 ));
1621
1622 // If this emission flipped the peer into silent-drop mode,
1623 // surface `SilentDropActive` once so operators see the
1624 // K-then-silent transition.
1625 if self
1626 .framework
1627 .peer_state
1628 .backpressure
1629 .is_silent_drop_active(src_peer)
1630 {
1631 self.bus.publish(crate::bus::NodeEvent::Infra(
1632 crate::bus::InfraEvent::SilentDropActive { peer: src_peer },
1633 ));
1634 }
1635
1636 Vec::new()
1637 }
1638}
1639
1640/// Returns `true` when a cycle has reached its op-invocation budget.
1641/// `None` budget always returns `false` (cap disabled).
1642#[inline]
1643fn budget_hit(budget: Option<usize>, ops_invoked: usize) -> bool {
1644 matches!(budget, Some(cap) if ops_invoked >= cap)
1645}
1646
1647/// Reservation failure surfaced by `merge_src_peer_addresses` so the
1648/// caller can mint a single `WireReceiveError::AllocationFailed`
1649/// `EngineStep` carrying the bytes the boundary tried to claim.
1650/// Covers both S4 (the claimed-address dedup buffer) and S5
1651/// (`AddressBook::add_peer`'s peer-entry dedup buffer).
1652struct SrcAddressMergeAllocError {
1653 /// Approximate bytes the failing reservation requested
1654 /// (`address_count * size_of::<Address>()`). Mirrored into the
1655 /// `WireReceiveError::AllocationFailed::byte_count` field for
1656 /// telemetry.
1657 byte_count: usize,
1658 /// Why the reservation failed. `HeapExhausted` for both sites;
1659 /// the boundary has no per-item cap (cap-driven rejection lives
1660 /// on the application-ingress path, not here).
1661 reason: crate::bus::AllocFailReason,
1662}
1663
1664#[cfg(test)]
1665#[path = "poll_recv_seed_tests.rs"]
1666mod poll_recv_seed_tests;
1667
1668#[cfg(test)]
1669#[path = "poll_bus_routing_tests.rs"]
1670mod poll_bus_routing_tests;
1671
1672#[cfg(test)]
1673#[path = "poll_ingress_handler_tests.rs"]
1674mod poll_ingress_handler_tests;
1675
1676#[cfg(test)]
1677#[path = "poll_budget_tests.rs"]
1678mod poll_budget_tests;
1679
1680#[cfg(test)]
1681#[path = "poll_async_error_tests.rs"]
1682mod poll_async_error_tests;
1683
1684#[cfg(test)]
1685#[path = "poll_wire_timeout_tests.rs"]
1686mod poll_wire_timeout_tests;
1687
1688#[cfg(test)]
1689#[path = "introspection_tests.rs"]
1690mod introspection_tests;
1691
1692#[cfg(test)]
1693#[path = "peer_governor_tests.rs"]
1694mod peer_governor_tests;
1695
1696#[cfg(test)]
1697#[path = "poll_backpressure_tests.rs"]
1698mod poll_backpressure_tests;
1699
1700#[cfg(test)]
1701#[path = "poll_src_peer_addresses_tests.rs"]
1702mod poll_src_peer_addresses_tests;
1703
1704#[cfg(test)]
1705#[path = "poll_observed_address_tests.rs"]
1706mod poll_observed_address_tests;
1707
1708#[cfg(test)]
1709#[path = "poll_typed_receive_tests.rs"]
1710mod poll_typed_receive_tests;
1711
1712#[cfg(test)]
1713#[path = "poll_ingress_alloc_tests.rs"]
1714mod poll_ingress_alloc_tests;
1715
1716#[cfg(test)]
1717#[path = "poll_backend_materialize_tests.rs"]
1718mod poll_backend_materialize_tests;