1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
//! `EngineStep` - observable output of `Engine::poll` per
//! `docs/ENGINE.md` §10 + `docs/internal/IMPLEMENTATION_PLAN.md` //! lines 751-753.
use crate::bus::{OpError, WireReceiveErrorKind};
use crate::envelope::WireEnvelope;
use crate::framework::BlockReason;
use crate::ids::{CommandId, ExecId, NodeSiteId, OpRef, PeerId};
/// One step of work the engine performed during a poll cycle.
/// `Engine::poll` returns `Vec<EngineStep>` capturing every event
/// the host can observe.
#[derive(Clone, Debug)]
pub enum EngineStep {
/// An Op completed successfully + wrote `sites_written` slots.
OpCompleted {
/// The completed Op.
op_ref: OpRef,
/// The execution it belonged to.
exec_id: ExecId,
/// Output sites the Op wrote.
sites_written: Vec<NodeSiteId>,
},
/// An Op suspended on a `CommandId` awaiting completion.
AsyncSuspended {
/// The suspended Op.
op_ref: OpRef,
/// The execution it belonged to.
exec_id: ExecId,
/// The CommandId the Op returned.
cmd_id: CommandId,
},
/// An outbound envelope is ready to ship.
SendEnvelope(WireEnvelope),
/// An app-facing event was published. Carries the topic name
/// (the `function.output` value name, or the `AppEmit` topic for
/// mid-cycle emissions) plus the serialized value bytes. When the
/// emitter is a `Notify`-style call with no payload, `value_bytes`
/// is empty.
AppEvent {
/// Module that emitted the event.
module_name: String,
/// Topic name - typically a `function.output` name for
/// top-level surface, or an explicit topic for `AppEmit`.
topic: String,
/// Serialized payload - the slot value at the emission site,
/// encoded via the type's `WireType::to_wire_bytes` path.
/// Empty for marker-only notifications.
value_bytes: Vec<u8>,
},
/// A lifecycle phase fired. fills in the per-phase
/// payload.
LifecycleFired {
/// Phase name (e.g. `"Bootstrap"`, `"PreShutdown"`).
phase: String,
},
/// The single bootstrap FunctionCall the engine seeded at install
/// completion drained to quiescence. Body ops fire on the same
/// poll cycle once this step is emitted. Bootstrap is a one-shot
/// per Node lifetime; a restored Node does not re-emit (its
/// bootstrap pass already ran pre-snapshot).
BootstrapComplete,
/// At least one bootstrap-phase op returned `DispatchResult::Async`
/// and the engine is waiting on its completion before activating
/// body ops. The host drives the completion via the ingress and
/// re-invokes `Node::run_bootstrap`.
WaitingOnBootstrap,
/// An Op failed. The error is also published on the bus as
/// `InfraEvent::OpFailure`.
OpFailed {
/// The failed Op.
op_ref: OpRef,
/// The execution it belonged to.
exec_id: ExecId,
/// The failure detail.
error: OpError,
},
/// `cycle_op_budget` was hit during a `poll()`. The engine
/// yielded mid-cascade; the host should poll again to drain
/// the remaining frontier. Emitted at most once per poll.
CycleBudgetExceeded {
/// Number of op-invocations the cycle issued before
/// yielding (== `cycle_op_budget`).
ops_invoked: usize,
},
/// `max_outbound_queue` was hit since the previous poll;
/// `count` envelopes were FIFO-dropped to make room. Emitted
/// at most once per poll.
OutboundDropped {
/// Number of envelopes dropped since the last poll.
count: usize,
},
/// An inbound wire envelope's payload could not be decoded.
/// The envelope's slot fill was dropped; this step lets the
/// host observe the drop. Carries the same context as the
/// matching `InfraEvent::WireDecodeFailure` on the bus.
WireDecodeFailed {
/// Wire-type hash the envelope advertised (0 if the
/// failure occurred before the hash could be read).
hash: u64,
/// Length of the offending payload, in bytes.
payload_size: usize,
/// Human-readable failure detail.
detail: String,
},
/// An inbound wire fill failed the typed-decode step. Mirrors
/// the bus's [`crate::bus::InfraEvent::WireReceiveError`] so
/// the host poll() caller observes the per-fill failure
/// without subscribing to the bus. Other fills in the same
/// envelope still deliver (partial-delivery semantics).
WireReceiveFailed {
/// Sender of the failing envelope, if known.
src_peer: Option<PeerId>,
/// Position of the failing fill within the envelope
/// (0-based).
fill_index: u32,
/// The `type_hash` the sender stamped on the fill.
actual_hash: u64,
/// Length of the offending payload, in bytes.
payload_size: usize,
/// Which failure mode fired.
kind: WireReceiveErrorKind,
},
/// A registered in-flight request entry was evicted by the
/// engine's per-poll `RequestTracker::drain_stale` sweep because
/// its per-entry TTL elapsed without a matching response. The
/// originator's local DAG continuation parked behind
/// `parked_op` (if `Some`) is failed with "chain timeout" via
/// the same path async-suspension completions take.
WireTimeout {
/// The chain correlation token that timed out.
wire_req_id: u64,
/// Destination site the request was dispatched to.
target_site: crate::ids::NodeSiteId,
/// Engine-clock timestamp when the originating Send fired.
started_at_ns: u64,
/// `CommandId` of the originator's parked local op, if the
/// request was registered with one.
parked_op: Option<crate::ids::CommandId>,
},
/// An inbound envelope from `peer` was dropped by the
/// [`crate::framework::PeerGovernor`] before any slot was
/// written. path -
/// the "first contact with IP" check the user flagged.
PeerBlocked {
/// The peer whose envelope was rejected.
peer: PeerId,
/// Why the envelope was rejected.
reason: BlockReason,
},
/// A peer crossed below the failure threshold and is now
/// marked down. Emitted at most once per transition.
PeerDown {
/// The peer that went down.
peer: PeerId,
},
/// A peer recovered after a failure streak.
PeerUp {
/// The peer that came back up.
peer: PeerId,
},
/// `wire::Send` could not resolve its destination peer's
/// addresses against the framework's
/// [`crate::framework::AddressBook`]. Either the peer is
/// unknown, its address list is empty, or the Send op's `peer`
/// input didn't carry a valid `PeerId`. The Send op produces
/// no envelope; the host application reacts via this event.
/// Mirrors `InfraEvent::PeerResolveFailure` on the bus -
/// telemetry-tap parity with the /// `PeerBlocked`/`PeerDown`/`PeerUp` family.
PeerResolveFailed {
/// The peer whose addresses could not be resolved. `None`
/// when the Send op had no parseable `peer` input.
peer: Option<PeerId>,
/// The Send op that failed to resolve.
op_ref: OpRef,
/// Execution this Send belonged to.
exec_id: ExecId,
},
}