1use std::collections::HashMap;
21use std::sync::Arc;
22
23use prost::Message;
24
25use crate::concrete::ComponentHandle;
26use crate::engine::Engine;
27use crate::errors::delivery::DeliveryError;
28use crate::errors::restore::RestoreError;
29use crate::snapshot::transient::TransientSnapshot;
30use crate::snapshot::{NamedComponentSnapshot, NamedGraphSnapshot, NodeConfigSnapshot};
31use bb_ir::proto::onnx::ModelProto;
32
33pub type TelemetryTap = Box<dyn FnMut(&crate::engine::EngineStep)>;
37
38pub struct Node {
44 pub(crate) engine: Engine,
45 pub(crate) config: NodeConfig,
46 pub(crate) incarnation: u64,
47 pub(crate) module_index: HashMap<String, Arc<ModelProto>>,
56 pub(crate) model: Option<Arc<ModelProto>>,
62 pub(crate) component_handles: Vec<ComponentHandle>,
63 pub(crate) telemetry_tap: Option<TelemetryTap>,
67}
68
69impl std::fmt::Debug for Node {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("Node")
72 .field("incarnation", &self.incarnation)
73 .field("modules", &self.module_index.keys().collect::<Vec<_>>())
74 .field("component_count", &self.component_handles.len())
75 .finish_non_exhaustive()
76 }
77}
78
79impl Node {
82 #[doc(hidden)]
102 pub fn new(peer_id: crate::ids::PeerId, addresses: Vec<crate::framework::Address>) -> Self {
103 let config = NodeConfig::new(peer_id);
104 let mut engine = Engine::with_bus_capacity(config.bus_capacity);
105 engine.self_peer = peer_id;
106 engine.apply_config_caps(&config);
107 engine.register_all_framework_syscalls();
108 let real_addresses: Vec<crate::framework::Address> = addresses
109 .into_iter()
110 .filter(|a| a != &crate::framework::Address::empty())
111 .collect();
112 if !real_addresses.is_empty() {
113 let _ = engine
114 .framework
115 .address_book
116 .add_peer(peer_id, real_addresses);
117 }
118 Self {
119 engine,
120 config,
121 incarnation: 0,
122 module_index: HashMap::new(),
123 model: None,
124 component_handles: Vec::new(),
125 telemetry_tap: None,
126 }
127 }
128
129 pub fn with_config(mut self, cfg: NodeConfig) -> Self {
133 self.config = cfg.clone();
134 self.engine.apply_config_caps(&cfg);
135 self
136 }
137}
138
139impl Node {
142 pub fn snapshot(&self) -> Result<crate::snapshot::NodeSnapshot, crate::errors::SnapshotError> {
147 let queued = self.engine.bus.len();
153 if queued > 0 {
154 return Err(crate::errors::SnapshotError::BusNotDrained { queued, dropped: 0 });
155 }
156 Ok(self.snapshot_inner())
157 }
158
159 fn snapshot_inner(&self) -> crate::snapshot::NodeSnapshot {
160 let mut components: Vec<NamedComponentSnapshot> = Vec::new();
161 for handle in &self.component_handles {
162 let cref = crate::ids::ComponentRef::from(handle.instance_id);
163 let Some(instance) = self.engine.component(cref) else {
164 continue;
165 };
166 let state_bytes = (handle.serialize_fn)(instance);
167 components.push(NamedComponentSnapshot {
168 type_name: handle.type_name.to_string(),
169 instance_id: handle.instance_id,
170 package: handle.package,
171 state_bytes,
172 });
173 }
174
175 let mut graphs: Vec<NamedGraphSnapshot> = Vec::new();
176 for (name, installed) in self.engine.graphs_named() {
177 let function_proto_bytes = installed.function.encode_to_vec();
178 graphs.push(NamedGraphSnapshot {
179 name: name.to_string(),
180 function_proto_bytes,
181 });
182 }
183
184 let counters: std::collections::HashMap<String, u64> =
185 self.engine.framework.counters.clone();
186
187 let event_subscriptions: std::collections::HashMap<String, Vec<u64>> = self
188 .engine
189 .event_subscriptions
190 .iter()
191 .map(|(kind, sites)| {
192 (
193 kind.clone(),
194 sites.iter().map(|s| s.as_u64()).collect::<Vec<u64>>(),
195 )
196 })
197 .collect();
198
199 let address_book: Vec<crate::snapshot::transient::AddressBookEntrySnapshot> = self
200 .engine
201 .framework
202 .address_book
203 .iter()
204 .map(
205 |(peer, addrs, ref_count)| crate::snapshot::transient::AddressBookEntrySnapshot {
206 peer_id: peer.to_bytes(),
207 addresses: addrs.iter().map(|a| a.to_bytes()).collect(),
208 ref_count,
209 },
210 )
211 .collect();
212
213 let peer_governor = capture_peer_governor(&self.engine.framework.peer_state.governor);
214 let backoff_table = self
215 .engine
216 .framework
217 .peer_state
218 .backoff
219 .iter()
220 .map(|(p, s)| crate::snapshot::transient::BackoffEntry {
221 peer: p.to_bytes(),
222 attempts: s.attempts,
223 last_attempt_ns: s.last_attempt_ns,
224 next_retry_ns: s.next_retry_ns,
225 })
226 .collect();
227 let pending_async = self
228 .engine
229 .exec
230 .pending_async
231 .iter()
232 .map(|(cmd, p)| {
233 (
234 cmd.as_u64(),
235 crate::snapshot::transient::PendingAsyncSnapshot {
236 op_ref: p.op_ref.as_u64(),
237 exec_id: p.exec_id.as_u64(),
238 output_sites: p.output_sites.iter().map(|s| s.as_u64()).collect(),
239 deadline_ns: p.deadline_ns,
240 },
241 )
242 })
243 .collect();
244 let pending_outbound = self
245 .engine
246 .framework
247 .outbound_queue
248 .iter_for_snapshot()
249 .map(|env| crate::snapshot::transient::PendingOutboundEntry {
250 envelope_bytes: crate::envelope::EnvelopeCodec::encode(env),
251 redelivered: true,
252 })
253 .collect();
254
255 let transient = TransientSnapshot {
256 framework: crate::snapshot::transient::FrameworkSnapshot {
257 counters,
258 fired_phases: self.engine.fired_phases.clone(),
259 address_book,
260 peer_governor,
261 backoff_table,
262 pending_outbound,
263 peer_id_bytes: self.engine.self_peer.to_bytes(),
268 next_command_id: self.engine.exec.ids.next_command_id,
269 next_exec_id: self.engine.exec.ids.next_exec_id,
270 spec_version: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
271 },
272 bus: crate::snapshot::transient::TypedBusSnapshot {
273 event_subscriptions,
274 },
275 pending_async,
276 ..Default::default()
277 };
278
279 crate::snapshot::NodeSnapshot {
280 incarnation: self.incarnation,
281 config: NodeConfigSnapshot::from(&self.config),
282 graphs,
283 components,
284 transient,
285 }
286 }
287
288 pub fn restore(
290 &mut self,
291 snap: crate::snapshot::NodeSnapshot,
292 ) -> Result<(), crate::errors::restore::RestoreError> {
293 if snap.transient.framework.spec_version
294 != crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION
295 {
296 return Err(RestoreError::SpecVersionMismatch {
297 got: snap.transient.framework.spec_version,
298 expected: crate::snapshot::transient::CURRENT_SNAPSHOT_SPEC_VERSION,
299 });
300 }
301
302 for graph_snap in snap.graphs {
307 let function = bb_ir::proto::onnx::FunctionProto::decode(
308 graph_snap.function_proto_bytes.as_slice(),
309 )
310 .map_err(|e| {
311 RestoreError::SnapshotMismatch(format!(
312 "restore: failed to decode graph `{}`: {e}",
313 graph_snap.name,
314 ))
315 })?;
316 self.engine.install_graph(graph_snap.name, function);
317 }
318
319 for comp_snap in snap.components {
320 let cref = crate::ids::ComponentRef::from(comp_snap.instance_id);
321 let Some(handle) = self.component_handles.iter().find(|h| {
322 h.type_name == comp_snap.type_name && h.instance_id == comp_snap.instance_id
323 }) else {
324 return Err(RestoreError::SnapshotMismatch(format!(
325 "no handle on live Node for component {}@{}",
326 comp_snap.type_name, comp_snap.instance_id,
327 )));
328 };
329 let restored = (handle.restore_fn)(&comp_snap.state_bytes).map_err(|source| {
330 RestoreError::ComponentRestoreFailed {
331 type_name: comp_snap.type_name.clone(),
332 source,
333 }
334 })?;
335 self.engine.register_component(cref, restored);
336 }
337
338 self.engine.framework.counters.clear();
339 for (name, value) in snap.transient.framework.counters {
340 self.engine.framework.counters.insert(name, value);
341 }
342 self.engine.fired_phases = snap.transient.framework.fired_phases;
343 self.engine.event_subscriptions.clear();
344 for (kind, sites) in snap.transient.bus.event_subscriptions {
345 self.engine.event_subscriptions.insert(
346 kind,
347 sites
348 .into_iter()
349 .map(crate::ids::NodeSiteId::from)
350 .collect(),
351 );
352 }
353
354 for entry in snap.transient.framework.address_book {
355 let mut decoded: Vec<crate::framework::Address> =
356 Vec::with_capacity(entry.addresses.len());
357 let mut malformed = false;
358 for bytes in &entry.addresses {
359 match crate::framework::Address::from_bytes(bytes) {
360 Ok(a) => decoded.push(a),
361 Err(_) => {
362 malformed = true;
363 break;
364 }
365 }
366 }
367 if malformed || decoded.is_empty() {
368 continue;
369 }
370 let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer_id) else {
371 continue;
372 };
373 self.engine
374 .framework
375 .address_book
376 .restore_entry(peer, decoded, entry.ref_count);
377 }
378 self.engine.framework.peer_state.governor = crate::framework::PeerGovernor::new();
379 let governor_snap = snap.transient.framework.peer_governor;
380 self.engine
381 .framework
382 .peer_state
383 .governor
384 .set_failure_threshold(governor_snap.failure_threshold);
385 for peer_bytes in governor_snap.blocklist {
386 let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
387 continue;
388 };
389 self.engine.framework.peer_state.governor.block(peer);
390 }
391 if let Some(allow) = governor_snap.allowlist {
392 let set: std::collections::HashSet<_> = allow
393 .into_iter()
394 .filter_map(|b| crate::ids::PeerId::from_bytes(&b).ok())
395 .collect();
396 self.engine
397 .framework
398 .peer_state
399 .governor
400 .set_allowlist(Some(set));
401 }
402 for (peer_bytes, consecutive_failures, last_event_ns, down) in governor_snap.health {
403 let Ok(peer) = crate::ids::PeerId::from_bytes(&peer_bytes) else {
404 continue;
405 };
406 self.engine.framework.peer_state.governor.restore_health(
407 peer,
408 crate::framework::PeerHealth {
409 consecutive_failures,
410 last_event_ns,
411 down,
412 },
413 );
414 }
415 for entry in snap.transient.framework.backoff_table {
416 let Ok(peer) = crate::ids::PeerId::from_bytes(&entry.peer) else {
417 continue;
418 };
419 self.engine.framework.peer_state.backoff.restore_state(
420 peer,
421 crate::framework::backoff_table::BackoffState {
422 attempts: entry.attempts,
423 last_attempt_ns: entry.last_attempt_ns,
424 next_retry_ns: entry.next_retry_ns,
425 },
426 );
427 }
428 for (cmd_u64, snap_p) in snap.transient.pending_async {
429 self.engine.exec.pending_async.insert(
430 crate::ids::CommandId::from(cmd_u64),
431 crate::engine::PendingAsync {
432 op_ref: crate::ids::OpRef::from(snap_p.op_ref),
433 exec_id: crate::ids::ExecId::from(snap_p.exec_id),
434 output_sites: snap_p
435 .output_sites
436 .into_iter()
437 .map(crate::ids::NodeSiteId::from)
438 .collect(),
439 deadline_ns: snap_p.deadline_ns,
440 },
441 );
442 }
443 for entry in snap.transient.framework.pending_outbound {
444 if let Ok(env) = crate::envelope::EnvelopeCodec::decode_capped(
445 &entry.envelope_bytes,
446 &self.config.envelope_caps,
447 ) {
448 self.engine.framework.outbound_queue.push(env);
449 }
450 }
451
452 self.incarnation = snap.incarnation + 1;
453
454 Ok(())
455 }
456
457 pub fn clear(&mut self) {
460 self.engine.exec.frontier.clear();
461 self.engine.exec.slot_table.clear();
462 self.engine.exec.execution_state.clear();
463 self.engine.exec.pending_async.clear();
464 self.engine.exec.pending_completions.clear();
465 let _ = self.engine.ingress.drain_all();
466 self.engine.fired_phases.clear();
467 self.engine.framework.counters.clear();
468 }
469
470 pub fn incarnation(&self) -> u64 {
472 self.incarnation
473 }
474
475 pub fn loaded_modules(&self) -> Vec<&str> {
477 self.module_index.keys().map(|s| s.as_str()).collect()
478 }
479
480 pub fn linked_components(&self) -> Vec<&ComponentHandle> {
482 self.component_handles.iter().collect()
483 }
484
485 pub fn peer_id(&self) -> crate::ids::PeerId {
487 self.config.peer_id
488 }
489
490 pub fn execution_state(
492 &self,
493 exec_id: crate::ids::ExecId,
494 ) -> Option<&crate::engine::ExecutionState> {
495 self.engine.exec.execution_state.get(&exec_id)
496 }
497
498 pub fn pending_async_count(&self) -> usize {
500 self.engine.exec.pending_async.len()
501 }
502
503 pub fn engine_stats(&self) -> crate::engine::EngineStats {
505 self.engine.engine_stats()
506 }
507
508 #[doc(hidden)]
516 pub fn engine_install_handle(&mut self) -> &mut crate::engine::Engine {
517 &mut self.engine
518 }
519
520 #[doc(hidden)]
526 pub fn push_linked_component(&mut self, handle: crate::concrete::ComponentHandle) {
527 self.component_handles.push(handle);
528 }
529
530 #[doc(hidden)]
537 pub fn set_model(&mut self, model: ModelProto) {
538 self.model = Some(Arc::new(model));
539 }
540
541 #[doc(hidden)]
549 pub fn register_module(&mut self, module_name: String) {
550 let model = self
551 .model
552 .clone()
553 .expect("Node::set_model must run before register_module");
554 self.module_index.insert(module_name, model);
555 }
556
557 pub fn model(&self) -> Option<Arc<ModelProto>> {
563 self.model.clone()
564 }
565
566 pub fn slot(&self, slot_name: &str) -> Option<crate::ids::ComponentRef> {
570 self.engine.slot(slot_name)
571 }
572
573 pub fn slots_iter(&self) -> impl Iterator<Item = (&str, crate::ids::ComponentRef)> {
576 self.engine.slots_iter()
577 }
578
579 pub fn roles_for(
587 &self,
588 cref: crate::ids::ComponentRef,
589 ) -> std::collections::HashSet<crate::registry::ComponentRole> {
590 self.engine.roles_for(cref)
591 }
592
593 pub fn block_peer(&mut self, peer: crate::ids::PeerId) {
595 self.engine.framework.peer_state.governor.block(peer);
596 }
597
598 pub fn unblock_peer(&mut self, peer: crate::ids::PeerId) {
600 self.engine.framework.peer_state.governor.unblock(peer);
601 }
602
603 pub fn set_allowlist(
605 &mut self,
606 allowlist: Option<std::collections::HashSet<crate::ids::PeerId>>,
607 ) {
608 self.engine
609 .framework
610 .peer_state
611 .governor
612 .set_allowlist(allowlist);
613 }
614
615 pub fn peer_health(&self, peer: crate::ids::PeerId) -> Option<crate::framework::PeerHealth> {
617 self.engine.framework.peer_state.governor.peer_health(peer)
618 }
619
620 pub fn resolve_peer_addresses(
622 &self,
623 peer: crate::ids::PeerId,
624 ) -> Option<Vec<crate::framework::Address>> {
625 self.engine
626 .framework
627 .address_book
628 .lookup(peer)
629 .map(|addrs| addrs.to_vec())
630 }
631
632 pub fn set_telemetry_tap<F>(&mut self, tap: F)
634 where
635 F: FnMut(&crate::engine::EngineStep) + 'static,
636 {
637 self.telemetry_tap = Some(Box::new(tap));
638 }
639
640 pub fn add_peer(
642 &mut self,
643 peer: crate::ids::PeerId,
644 addresses: Vec<crate::framework::Address>,
645 ) -> Result<(), crate::framework::AddressBookError> {
646 self.engine.framework.address_book.add_peer(peer, addresses)
647 }
648
649 pub fn ingress_handle(&self) -> crate::ingress::IngressQueueRef {
653 crate::ingress::IngressQueueRef::new(self.engine.ingress_queue_handle())
654 }
655
656 pub fn config(&self) -> &NodeConfig {
658 &self.config
659 }
660
661 pub fn peer_address(&self) -> crate::framework::Address {
665 self.local_addresses()
666 .first()
667 .cloned()
668 .unwrap_or_else(crate::framework::Address::empty)
669 }
670
671 pub fn local_addresses(&self) -> &[crate::framework::Address] {
676 self.engine
677 .framework
678 .address_book
679 .lookup(self.engine.self_peer)
680 .unwrap_or(&[])
681 }
682
683 pub fn add_local_address(
686 &mut self,
687 address: crate::framework::Address,
688 ) -> Result<(), crate::framework::AddressBookError> {
689 let self_peer = self.engine.self_peer;
690 let book = &mut self.engine.framework.address_book;
691 if book.lookup(self_peer).is_some() {
692 book.register_address(self_peer, address)
693 } else {
694 book.add_peer(self_peer, vec![address])
695 }
696 }
697
698 pub fn forget_local_address(
702 &mut self,
703 address: &crate::framework::Address,
704 ) -> Result<(), crate::framework::AddressBookError> {
705 let self_peer = self.engine.self_peer;
706 self.engine
707 .framework
708 .address_book
709 .forget_address(self_peer, address)
710 }
711
712 pub fn run_bootstrap(
742 &mut self,
743 targets: &[crate::engine::BootstrapInput<'_>],
744 ) -> Result<Vec<crate::engine::EngineStep>, crate::errors::BootstrapError> {
745 if targets.is_empty() {
746 let armed = self.engine.run_bootstrap(&[])?;
750 if !armed && !self.engine.bootstrap_pending() {
751 return Ok(Vec::new());
752 }
753 } else {
754 for req in targets {
758 if !self.engine.module_bootstrap_registered(req.target) {
759 return Err(crate::errors::BootstrapError::UnknownTarget {
760 target_name: req.target.to_string(),
761 available: self.engine.module_bootstrap_target_names(),
762 });
763 }
764 }
765 self.engine.run_bootstrap(targets)?;
766 }
767 Ok(self.drain_bootstrap())
768 }
769
770 fn drain_bootstrap(&mut self) -> Vec<crate::engine::EngineStep> {
775 let mut steps = Vec::new();
776 loop {
777 let batch = self.engine.poll();
778 let waiting = matches!(
779 batch.last(),
780 Some(crate::engine::EngineStep::WaitingOnBootstrap)
781 );
782 if let Some(tap) = self.telemetry_tap.as_mut() {
783 for step in &batch {
784 tap(step);
785 }
786 }
787 steps.extend(batch);
788 if !self.engine.bootstrap_pending() || waiting {
789 break;
790 }
791 }
792 steps
793 }
794
795 pub fn bootstrap_status(&self) -> crate::engine::bootstrap::BootstrapStatus {
802 self.engine.bootstrap_status()
803 }
804
805 pub fn poll(
812 &mut self,
813 cx: &mut std::task::Context<'_>,
814 ) -> std::task::Poll<Vec<crate::engine::EngineStep>> {
815 let steps = self.engine.poll();
816 if let Some(tap) = self.telemetry_tap.as_mut() {
817 for step in &steps {
818 tap(step);
819 }
820 }
821 if steps.is_empty() {
822 self.engine.ingress.register_waker(cx.waker());
823 return std::task::Poll::Pending;
824 }
825 std::task::Poll::Ready(steps)
826 }
827
828 pub fn deliver_inbound(
835 &mut self,
836 src_peer: crate::ids::PeerId,
837 bytes: &[u8],
838 ) -> Result<(), crate::errors::delivery::DeliveryError> {
839 let envelope =
840 crate::envelope::EnvelopeCodec::decode_capped(bytes, &self.config.envelope_caps)
841 .map_err(|e| {
842 crate::errors::delivery::DeliveryError::InvalidEnvelope(e.to_string())
843 })?;
844 self.engine
845 .ingress
846 .push(crate::ingress::IngressEvent::EnvelopeFrom {
847 src_peer,
848 envelope,
849 src_observed_address: None,
854 })
855 .map_err(|_| crate::errors::delivery::DeliveryError::IngressClosed)
856 }
857
858 pub fn deliver_event(
871 &mut self,
872 module: &str,
873 input: &str,
874 value_bytes: &[u8],
875 ) -> Result<(), crate::errors::delivery::DeliveryError> {
876 if !self.module_index.contains_key(module) {
877 return Err(DeliveryError::UnknownModule(module.to_string()));
878 }
879 let byte_count = value_bytes.len();
880 let cap = self.config.max_app_event_bytes;
881 let source = || crate::bus::AppIngressSource::AppEvent {
882 module: module.to_string(),
883 input: input.to_string(),
884 };
885 if byte_count > cap {
886 self.emit_app_ingress_error(
887 source(),
888 byte_count,
889 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap },
890 );
891 return Err(DeliveryError::OversizePayload { byte_count, cap });
892 }
893 if let Err(reason) = self.engine.try_charge(byte_count) {
894 self.emit_app_ingress_error(
895 source(),
896 byte_count,
897 crate::bus::AppIngressErrorKind::BudgetExceeded {
898 budget_remaining: reason.budget_remaining,
899 },
900 );
901 return Err(DeliveryError::BudgetExceeded {
902 byte_count,
903 budget_remaining: reason.budget_remaining,
904 });
905 }
906 let mut owned: Vec<u8> = Vec::new();
907 if crate::fallible::try_reserve_exact(&mut owned, byte_count).is_err() {
911 self.engine.release(byte_count);
914 self.emit_app_ingress_error(
915 source(),
916 byte_count,
917 crate::bus::AppIngressErrorKind::AllocationFailed {
918 reason: crate::bus::AllocFailReason::HeapExhausted,
919 },
920 );
921 return Err(DeliveryError::AllocationFailed {
922 byte_count,
923 reason: crate::bus::AllocFailReason::HeapExhausted,
924 });
925 }
926 owned.extend_from_slice(value_bytes);
927 self.engine
928 .ingress
929 .push(crate::ingress::IngressEvent::AppEvent {
930 module_name: module.to_string(),
931 input_name: input.to_string(),
932 value_bytes: owned,
933 })
934 .map_err(|_| {
935 self.engine.release(byte_count);
938 DeliveryError::IngressClosed
939 })
940 }
941
942 pub fn invoke(
951 &mut self,
952 module: &str,
953 inputs: &[(&str, &[u8])],
954 ) -> Result<crate::ids::ExecId, crate::errors::delivery::DeliveryError> {
955 if !self.module_index.contains_key(module) {
956 return Err(DeliveryError::UnknownModule(module.to_string()));
957 }
958 let input_count = inputs.len();
959 let input_cap = self.config.max_invoke_inputs;
960 let source = || crate::bus::AppIngressSource::Invoke {
961 module: module.to_string(),
962 input_count,
963 };
964 if input_count > input_cap {
965 self.emit_app_ingress_error(
969 source(),
970 input_count,
971 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: input_cap },
972 );
973 return Err(DeliveryError::TooManyInputs {
974 count: input_count,
975 cap: input_cap,
976 });
977 }
978 let total_bytes: usize = inputs
979 .iter()
980 .fold(0usize, |acc, (_, b)| acc.saturating_add(b.len()));
981 let bytes_cap = self.config.max_invoke_bytes;
982 if total_bytes > bytes_cap {
983 self.emit_app_ingress_error(
984 source(),
985 total_bytes,
986 crate::bus::AppIngressErrorKind::PerItemCapExceeded { cap: bytes_cap },
987 );
988 return Err(DeliveryError::OversizePayload {
989 byte_count: total_bytes,
990 cap: bytes_cap,
991 });
992 }
993 if let Err(reason) = self.engine.try_charge(total_bytes) {
994 self.emit_app_ingress_error(
995 source(),
996 total_bytes,
997 crate::bus::AppIngressErrorKind::BudgetExceeded {
998 budget_remaining: reason.budget_remaining,
999 },
1000 );
1001 return Err(DeliveryError::BudgetExceeded {
1002 byte_count: total_bytes,
1003 budget_remaining: reason.budget_remaining,
1004 });
1005 }
1006 let mut owned: Vec<(String, Vec<u8>)> = Vec::new();
1011 if crate::fallible::try_reserve_exact(&mut owned, input_count).is_err() {
1012 self.engine.release(total_bytes);
1013 self.emit_app_ingress_error(
1014 source(),
1015 total_bytes,
1016 crate::bus::AppIngressErrorKind::AllocationFailed {
1017 reason: crate::bus::AllocFailReason::HeapExhausted,
1018 },
1019 );
1020 return Err(DeliveryError::AllocationFailed {
1021 byte_count: total_bytes,
1022 reason: crate::bus::AllocFailReason::HeapExhausted,
1023 });
1024 }
1025 for (name, bytes) in inputs.iter() {
1026 let mut buf: Vec<u8> = Vec::new();
1027 if crate::fallible::try_reserve_exact(&mut buf, bytes.len()).is_err() {
1028 self.engine.release(total_bytes);
1029 self.emit_app_ingress_error(
1030 source(),
1031 total_bytes,
1032 crate::bus::AppIngressErrorKind::AllocationFailed {
1033 reason: crate::bus::AllocFailReason::HeapExhausted,
1034 },
1035 );
1036 return Err(DeliveryError::AllocationFailed {
1037 byte_count: total_bytes,
1038 reason: crate::bus::AllocFailReason::HeapExhausted,
1039 });
1040 }
1041 buf.extend_from_slice(bytes);
1042 owned.push(((*name).to_string(), buf));
1043 }
1044 let exec_id = self.engine.allocate_exec_id();
1045 self.engine
1046 .ingress
1047 .push(crate::ingress::IngressEvent::Invoke {
1048 module_name: module.to_string(),
1049 inputs: owned,
1050 exec_id,
1051 })
1052 .map_err(|_| {
1053 self.engine.release(total_bytes);
1054 DeliveryError::IngressClosed
1055 })?;
1056 Ok(exec_id)
1057 }
1058
1059 fn emit_app_ingress_error(
1065 &mut self,
1066 source: crate::bus::AppIngressSource,
1067 byte_count: usize,
1068 kind: crate::bus::AppIngressErrorKind,
1069 ) {
1070 self.engine.bus.publish(crate::bus::NodeEvent::Infra(
1071 crate::bus::InfraEvent::AppIngressError {
1072 source,
1073 byte_count,
1074 kind,
1075 },
1076 ));
1077 }
1078}
1079
1080fn capture_peer_governor(
1082 governor: &crate::framework::PeerGovernor,
1083) -> crate::snapshot::transient::PeerGovernorSnapshot {
1084 crate::snapshot::transient::PeerGovernorSnapshot {
1085 blocklist: governor.blocklist().iter().map(|p| p.to_bytes()).collect(),
1086 allowlist: governor
1087 .allowlist()
1088 .map(|s| s.iter().map(|p| p.to_bytes()).collect()),
1089 health: governor
1090 .iter_health()
1091 .map(|(p, h)| {
1092 (
1093 p.to_bytes(),
1094 h.consecutive_failures,
1095 h.last_event_ns,
1096 h.down,
1097 )
1098 })
1099 .collect(),
1100 failure_threshold: governor.failure_threshold(),
1101 }
1102}
1103
1104pub mod config;
1105pub mod derivation;
1106pub use config::{
1107 NodeConfig, DEFAULT_BUS_CAPACITY, DEFAULT_CYCLE_OP_BUDGET, DEFAULT_INGRESS_BYTE_BUDGET,
1108 DEFAULT_MAX_APP_EVENT_BYTES, DEFAULT_MAX_COMPLETION_RESULT_BYTES, DEFAULT_MAX_INVOKE_BYTES,
1109 DEFAULT_MAX_INVOKE_INPUTS, DEFAULT_MAX_OUTBOUND_QUEUE, DEFAULT_MAX_PENDING_ASYNC,
1110 EDGE_INGRESS_BYTE_BUDGET, EDGE_MAX_APP_EVENT_BYTES, EDGE_MAX_COMPLETION_RESULT_BYTES,
1111 EDGE_MAX_INVOKE_BYTES, EDGE_MAX_INVOKE_INPUTS,
1112};
1113
1114
1115#[cfg(test)]
1116#[path = "snapshot_fidelity_tests.rs"]
1117mod snapshot_fidelity_tests;
1118
1119#[cfg(test)]
1120#[path = "shared_model_tests.rs"]
1121mod shared_model_tests;