1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
use super::*;
impl MeerkatMachine {
pub(super) async fn execute_meerkat_machine_drain_command(
self: &Arc<Self>,
command: MeerkatMachineCommand,
) -> Result<MeerkatMachineCommandResult, RuntimeDriverError> {
match command {
MeerkatMachineCommand::SetPeerIngressContext {
session_id,
keep_alive,
comms_runtime,
mob_id,
} => {
// Guard: session must exist.
if !self.sessions.read().await.contains_key(&session_id) {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
});
}
// Guard: DrainBindingInvariant — no drain mutation on destroyed
// sessions.
if matches!(
self.existing_session_runtime_state(&session_id).await,
Some(RuntimeState::Destroyed)
) {
return Err(RuntimeDriverError::Destroyed);
}
let gate = self.session_mutation_gate(&session_id).await;
let _gate_guard = match gate {
Some(ref g) => Some(g.lock().await),
None => None,
};
self.stage_session_dsl_input(
&session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::SetPeerIngressContext {
keep_alive,
},
"SetPeerIngressContext",
)
.await
.map_err(|reason| RuntimeDriverError::ValidationFailed { reason })?;
// W2-G (issue #264): peer-ingress ownership is tracked by
// the DSL via `peer_ingress_owner_kind` +
// `peer_ingress_comms_runtime_id` + `peer_ingress_mob_id`.
// The `SetPeerIngressContext` transition above is a
// keep-alive-only self-loop that doesn't mutate those
// fields; the ownership-establishing transitions are
// `AttachSessionIngress` / `AttachMobIngress` at
// `meerkat-runtime/src/meerkat_machine/dsl.rs:6375-6407`.
//
// When a `comms_runtime` is provided:
// - If `mob_id` is present → fire `AttachMobIngress` to
// record `MobOwned { comms_runtime_id, mob_id }`.
// - Otherwise → fire `AttachSessionIngress` to record
// `SessionOwned { comms_runtime_id }`.
// The attach/detach transitions below encode exact
// idempotence in DSL guards. Any remaining rejection is an
// authoritative denial (for example a session attach trying
// to downgrade a mob-owned ingress) and must stop before the
// drain shell mutates task state.
//
// When `comms_runtime` is `None` the caller intends
// keep-alive-only; no ownership transition fires.
if let Some(ref runtime) = comms_runtime {
let comms_runtime_id =
crate::meerkat_machine::dsl::CommsRuntimeId::from_runtime(runtime);
let attach_input = if let Some(mob_id) = mob_id {
crate::meerkat_machine::dsl::MeerkatMachineInput::AttachMobIngress {
comms_runtime_id,
mob_id,
}
} else {
crate::meerkat_machine::dsl::MeerkatMachineInput::AttachSessionIngress {
comms_runtime_id,
}
};
self.stage_session_dsl_input(&session_id, attach_input, "AttachPeerIngress")
.await
.map_err(|reason| RuntimeDriverError::ValidationFailed { reason })?;
} else if !keep_alive {
// keep_alive=false + comms_runtime=None → caller is
// tearing down the drain. Fire `DetachIngress` to
// clear any active ownership. The DSL accepts exact
// no-op detach; other rejection stops the shell here.
self.stage_session_dsl_input(
&session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::DetachIngress,
"DetachIngress",
)
.await
.map_err(|reason| RuntimeDriverError::ValidationFailed { reason })?;
}
// Ownership transitions above have succeeded (or been
// idempotently rejected); dispatch the mechanical
// drain-task lifecycle side-effect. `update_peer_ingress_
// context_inner` stages `SpawnDrain` / `Abort` on the DSL
// and, on DSL accept, spawns / aborts the drain task.
// On DSL rejection it returns false without mutating
// shell slot state (preserving the bdd460951 invariant
// "no shell mutation after DSL rejection"). Its return
// value is the typed `Spawned(bool)` result the caller
// of `maybe_spawn_comms_drain` observes.
let spawned = self
.update_peer_ingress_context_inner(&session_id, keep_alive, comms_runtime)
.await?;
Ok(MeerkatMachineCommandResult::Spawned(spawned))
}
MeerkatMachineCommand::NotifyDrainExited { session_id, reason } => {
// D2b: a drain-exit observation arriving after the session's
// sessions-map entry is gone is a legitimate post-teardown
// interleaving (the unregister drain aborts the drain task and
// removes the entry; a straggling exit can still be reported).
// It is observation-shaped and benign — surface it as an
// accepted no-op, not a `Destroyed` error.
if !self.sessions.read().await.contains_key(&session_id) {
tracing::debug!(
%session_id,
?reason,
"post-teardown drain-exit observation (benign no-op)"
);
return Ok(MeerkatMachineCommandResult::Unit);
}
let gate = self.session_mutation_gate(&session_id).await;
let _gate_guard = match gate {
Some(ref g) => Some(g.lock().await),
None => None,
};
// Stage-first: NotifyDrainExited is not declared from
// Destroyed (DrainBindingInvariant); the machine rejects it
// there and the rejection is classified as the terminal
// `Destroyed` truth.
if let Err(reason) = self
.stage_session_dsl_input(
&session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::NotifyDrainExited {
reason: crate::meerkat_machine::dsl::DrainExitReason::from(reason),
},
"NotifyDrainExited",
)
.await
{
return Err(self
.classify_session_dsl_rejection(&session_id, reason)
.await);
}
self.notify_comms_drain_exited_inner(&session_id, reason)
.await;
Ok(MeerkatMachineCommandResult::Unit)
}
_ => unreachable!("non-drain command routed to drain handler"),
}
}
pub(super) async fn execute_meerkat_machine_drain_local_command(
&self,
command: MeerkatMachineCommand,
) -> Result<MeerkatMachineCommandResult, RuntimeDriverError> {
match command {
MeerkatMachineCommand::AbortAll => {
// Stage StopDrain for each session whose generated drain
// authority says Running. The shell slot is only task
// mechanics and does not decide lifecycle.
let session_states: Vec<(meerkat_core::types::SessionId, bool)> = {
let sessions = self.sessions.read().await;
sessions
.iter()
.map(|(sid, entry)| {
let authority = entry
.dsl_authority
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
(
sid.clone(),
authority.state().drain_phase
== crate::meerkat_machine::dsl::DrainPhase::Running,
)
})
.collect()
};
let mut accepted_session_ids = Vec::new();
for (sid, running) in &session_states {
if !running || self.stage_drain_stop_dsl(sid).await {
accepted_session_ids.push(sid.clone());
}
}
let mut sessions = self.sessions.write().await;
for sid in accepted_session_ids {
if let Some(entry) = sessions.get_mut(&sid) {
abort_slot(&mut entry.drain_slot);
}
}
Ok(MeerkatMachineCommandResult::Unit)
}
MeerkatMachineCommand::Abort { session_id } => {
// Guard: session must be registered.
if !self.sessions.read().await.contains_key(&session_id) {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
});
}
// Stage StopDrain if generated drain authority says Running.
let drain_is_running =
self.drain_authority_state(&session_id)
.await
.is_some_and(|state| {
state.phase == crate::meerkat_machine::dsl::DrainPhase::Running
});
if drain_is_running && !self.stage_drain_stop_dsl(&session_id).await {
return Ok(MeerkatMachineCommandResult::Unit);
}
let mut sessions = self.sessions.write().await;
if let Some(entry) = sessions.get_mut(&session_id) {
abort_slot(&mut entry.drain_slot);
}
Ok(MeerkatMachineCommandResult::Unit)
}
MeerkatMachineCommand::Wait { session_id } => {
// Guard: session must be registered.
if !self.sessions.read().await.contains_key(&session_id) {
return Err(RuntimeDriverError::NotReady {
state: RuntimeState::Destroyed,
});
}
let handle = {
let mut sessions = self.sessions.write().await;
sessions
.get_mut(&session_id)
.and_then(|entry| entry.drain_slot.take_handle())
};
if let Some(handle) = handle {
let _ = handle.await;
}
// Re-read post-await to safety-net a panicked task that
// never notified the authority. The generated
// `NotifyDrainExited { Failed }` transition owns the
// resulting clean-vs-respawnable state; the shell then only
// clears task-handle mechanics after acceptance.
let drain_is_running =
self.drain_authority_state(&session_id)
.await
.is_some_and(|state| {
state.phase == crate::meerkat_machine::dsl::DrainPhase::Running
});
if drain_is_running {
let dsl_accepted = self
.stage_session_dsl_input(
&session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::NotifyDrainExited {
reason: crate::meerkat_machine::dsl::DrainExitReason::Failed,
},
"NotifyDrainExited(safety)",
)
.await
.map_err(|err| {
tracing::warn!(
error = %err,
"DSL rejected drain exit safety net"
);
err
})
.is_ok();
tracing::warn!(
"comms_drain: task exited without notifying authority (likely panicked), \
submitting Failed safety net"
);
if dsl_accepted {
self.project_comms_drain_failed_safety_net(&session_id)
.await;
}
}
Ok(MeerkatMachineCommandResult::Unit)
}
_ => unreachable!("non-drain-local command routed to drain-local handler"),
}
}
/// Fire the typed `StopDrain` DSL input for `session_id` if the session
/// still has a live DSL authority. Returns whether the machine accepted
/// the transition; callers use that gate before applying shell-side abort
/// projection.
async fn stage_drain_stop_dsl(&self, session_id: &meerkat_core::types::SessionId) -> bool {
if let Err(err) = self
.stage_session_dsl_input(
session_id,
crate::meerkat_machine::dsl::MeerkatMachineInput::StopDrain,
"StopDrain",
)
.await
{
tracing::warn!(
%session_id,
error = %err,
"DSL rejected StopDrain; skipping drain abort"
);
false
} else {
true
}
}
}