1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
//! Per-conversation aggregate owned by the orchestrator: protocol state, MLS
//! service, plug-ins, state machine, durable config, and operating mode.
//! App-side wiring (phase timer, auto-vote handles) lives on
//! [`crate::app::SessionRunner`], which owns one `ConversationHandle` by value.
use prost::Message;
use tracing::info;
use crate::{
core::{
BufferedCommitCandidate, Conversation, ConversationConfig, ConversationPluginsFactory,
ConversationState, ConversationStateMachine, CoreError, FreezeBufferOutcome,
FreezeFinalizeResult, OperatingMode, ProcessResult, ProposalKind, StewardListPlugin,
compute_commit_hash, finalize_freeze_round, member_set, process_inbound,
},
ds::{APP_MSG_SUBTOPIC, OutboundPacket},
mls_crypto::{
CommitCandidate as MlsCommitCandidate, KeyPackageBytes, MlsCommitInput, MlsService,
},
protos::de_mls::messages::v1::{
AppMessage, CommitCandidate, conversation_update_request::Payload,
},
};
/// Per-conversation aggregate owned by the orchestrator. Bundles protocol
/// state, the MLS service, plug-ins, state machine, durable config, and
/// operating mode behind one type parameter.
pub struct ConversationHandle<CP: ConversationPluginsFactory> {
pub(crate) conversation: Conversation,
/// Per-conversation MLS service. `None` for joiners in `PendingJoin` who
/// haven't accepted a welcome yet; once attached via
/// [`Self::attach_mls`] it stays `Some` for the handle's lifetime.
mls: Option<CP::Mls>,
/// Per-conversation state machine. The orchestrator updates this together
/// with the phase timer so the two never drift.
pub(crate) state_machine: ConversationStateMachine,
/// Per-conversation durable config: voting/consensus durations,
/// `liveness_criteria_yes`, `pending_update_max_epochs`. Read by
/// orchestrators; joiner-sync writes through this directly.
pub(crate) config: ConversationConfig,
/// Per-conversation peer-score plug-in.
pub(crate) scoring: CP::Scoring,
/// Per-conversation steward-list plug-in. Holds the active list, retry
/// counter, and election retry policy. Orchestrator composes
/// eligibility from MLS members + `Conversation::is_pending_removal`.
pub(crate) steward_list: CP::StewardList,
/// Authorization mode (RFC §Layer 3 Anti-Deadlock ECP). `Recovery` is
/// set when an accepted Deadlock ECP relaxes the steward gate so any
/// member may produce the next commit; cleared on accepted election.
operating_mode: OperatingMode,
}
impl<CP: ConversationPluginsFactory> ConversationHandle<CP> {
/// Build a fresh handle. Creator path passes `Some(mls)`; joiner
/// path passes `None` and attaches later via [`Self::attach_mls`].
pub(crate) fn new(
conversation: Conversation,
mls: Option<CP::Mls>,
state_machine: ConversationStateMachine,
config: ConversationConfig,
scoring: CP::Scoring,
steward_list: CP::StewardList,
) -> Self {
Self {
conversation,
mls,
state_machine,
config,
scoring,
steward_list,
operating_mode: OperatingMode::Normal,
}
}
// ── Operating mode (Layer 3 Anti-Deadlock) ──────────────────────
pub(crate) fn is_in_recovery_mode(&self) -> bool {
self.operating_mode == OperatingMode::Recovery
}
pub(crate) fn enter_recovery_mode(&mut self) {
self.operating_mode = OperatingMode::Recovery;
}
pub(crate) fn exit_recovery_mode(&mut self) {
self.operating_mode = OperatingMode::Normal;
}
// ── State accessor ──────────────────────────────────────────────
pub(crate) fn current_state(&self) -> ConversationState {
self.state_machine.current_state()
}
// ── MLS service ─────────────────────────────────────────────────
/// Borrow the MLS service, if attached. `None` for joiners
/// pre-welcome.
pub(crate) fn mls(&self) -> Option<&CP::Mls> {
self.mls.as_ref()
}
/// Borrow the MLS service, erroring with
/// [`CoreError::MlsGroupNotInitialized`] when not attached.
pub(crate) fn expect_mls(&self) -> Result<&CP::Mls, CoreError> {
self.mls.as_ref().ok_or(CoreError::MlsGroupNotInitialized)
}
/// Mutable [`Self::expect_mls`] — required for the commit pipeline
/// and encrypt/decrypt methods that advance MLS state.
pub(crate) fn expect_mls_mut(&mut self) -> Result<&mut CP::Mls, CoreError> {
self.mls.as_mut().ok_or(CoreError::MlsGroupNotInitialized)
}
/// Attach an MLS service. Called by joiners after the welcome
/// arrives. Caller is responsible for not double-attaching.
pub(crate) fn attach_mls(&mut self, mls: CP::Mls) {
self.mls = Some(mls);
}
/// Drop the attached MLS service and return it. Used on conversation leave
/// so the caller can run service-side cleanup (`mls.delete()`).
pub(crate) fn take_mls(&mut self) -> Option<CP::Mls> {
self.mls.take()
}
// ── Protocol-function wrappers ─────────────────────────────────
//
// Read `conversation`, `mls`, and `steward` from `self` so coordinator
// callsites don't destructure the entry. Protocol logic lives in
// sibling `core` modules; these are pure delegation.
/// Build a commit candidate. Errors with
/// [`CoreError::MlsGroupNotInitialized`] when no MLS service is
/// attached.
pub(crate) fn create_commit_candidate(
&mut self,
self_identity: &[u8],
app_id: &[u8],
) -> Result<Option<OutboundPacket>, CoreError> {
if self.mls.is_none() {
return Err(CoreError::MlsGroupNotInitialized);
}
if !self.steward_list.is_steward(self_identity) && !self.is_in_recovery_mode() {
return Err(CoreError::NotASteward);
}
if self.conversation.approved_proposals().is_empty() {
return Err(CoreError::NoProposals);
}
// MLS forbids committing one's own removal. If the approved batch contains
// RemoveMember(self), skip local candidate creation — another steward will
// commit the batch (including this node's removal) once they enter freeze.
if self.conversation.is_pending_removal(self_identity) {
info!(
conversation = self.conversation.name(),
"commit candidate skipped: approved batch contains self-remove"
);
return Ok(None);
}
// Governance proposals (emergency, election) are consensus-only and must
// not be in the approved queue at batch creation time.
let non_mls_ids: Vec<u32> = self
.conversation
.approved_proposals()
.iter()
.filter(|(_, req)| ProposalKind::of(req).is_governance())
.map(|(&id, _)| id)
.collect();
if !non_mls_ids.is_empty() {
return Err(CoreError::UnexpectedNonMlsProposals {
proposal_ids: non_mls_ids,
});
}
let mls = self.mls.as_mut().ok_or(CoreError::MlsGroupNotInitialized)?;
// Drop approved entries already reflected in conversation state (stale
// rebroadcast KPs, duplicate removes) — without this MLS would reject
// the whole batch with "Duplicate signature key in proposals and conversation".
let current_members = mls.members()?;
let current_members_set = member_set(¤t_members);
let is_member = |id: &[u8]| current_members_set.contains(id);
// Urgent (ECP-driven) freeze: restrict the batch to just the target's
// RemoveMember. See `Conversation::urgent_commit_target`.
let urgent_target = self.conversation.urgent_commit_target().map(|t| t.to_vec());
// Iterate in insertion order (FIFO): library proposal IDs are
// content-derived hashes, so sort-by-id is not temporal.
let k_max = mls.commit_batch_max();
let mut updates = Vec::with_capacity(self.conversation.approved_order().len().min(k_max));
for pid in self.conversation.approved_order() {
if updates.len() >= k_max {
break;
}
let Some(proposal) = self.conversation.approved_proposals().get(pid) else {
continue;
};
match proposal.payload.as_ref() {
Some(Payload::InviteMember(im)) => {
if urgent_target.is_some() {
continue;
}
if is_member(&im.identity) {
continue;
}
updates.push(MlsCommitInput::Add(KeyPackageBytes::new(
im.key_package_bytes.clone(),
im.identity.clone(),
)));
}
Some(Payload::RemoveMember(rm)) => {
if let Some(target) = urgent_target.as_deref()
&& rm.identity != target
{
continue;
}
if !is_member(&rm.identity) {
continue;
}
updates.push(MlsCommitInput::Remove(rm.identity.clone()));
}
_ => return Err(CoreError::InvalidConversationUpdateRequest),
}
}
if updates.is_empty() {
return Ok(None);
}
let MlsCommitCandidate {
proposals: mls_proposals,
commit,
welcome,
} = mls.create_commit_candidate(&updates)?;
let candidate = CommitCandidate {
conversation_name: self.conversation.name_bytes().to_vec(),
mls_proposals,
commit_message: commit,
steward_identity: self_identity.to_vec(),
};
// Welcome bytes are deferred: sent from finalize_freeze_round after the
// commit merges, so joiners can't advance epoch ahead of the steward.
let commit_hash = compute_commit_hash(&candidate.commit_message);
let epoch = mls.current_epoch()?;
let outcome = self.conversation.add_freeze_candidate(
BufferedCommitCandidate {
candidate_msg: candidate.clone(),
commit_hash,
is_local_candidate: true,
welcome_bytes: welcome,
},
epoch,
);
// Non-Buffered outcomes are legitimate runtime states (see
// `FreezeBufferOutcome`), not errors — log at debug.
if !matches!(outcome, FreezeBufferOutcome::Buffered) {
tracing::debug!(
conversation = self.conversation.name(),
epoch,
?outcome,
"local commit candidate not buffered",
);
}
info!(
conversation = self.conversation.name(),
epoch,
proposals = updates.len(),
"commit candidate created"
);
let candidate_msg: AppMessage = candidate.into();
Ok(Some(OutboundPacket::new(
candidate_msg.encode_to_vec(),
APP_MSG_SUBTOPIC,
self.conversation.name(),
app_id,
)))
}
/// Finalize the active freeze round.
pub(crate) fn finalize_freeze_round(
&mut self,
allow_subset_candidates: bool,
app_id: &[u8],
self_identity: &[u8],
) -> Result<FreezeFinalizeResult, CoreError> {
let in_recovery = self.operating_mode == OperatingMode::Recovery;
let mls = self.mls.as_mut().ok_or(CoreError::MlsGroupNotInitialized)?;
finalize_freeze_round(
&mut self.conversation,
mls,
&self.steward_list,
in_recovery,
allow_subset_candidates,
app_id,
self_identity,
)
}
/// Process an inbound app-subtopic payload. Errors with
/// [`CoreError::MlsGroupNotInitialized`] when no MLS service is
/// attached — caller should check `mls().is_some()` first.
pub(crate) fn process_inbound(&mut self, payload: &[u8]) -> Result<ProcessResult, CoreError> {
let mls = self.mls.as_mut().ok_or(CoreError::MlsGroupNotInitialized)?;
process_inbound(&mut self.conversation, mls, payload)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_fixtures::{StubPluginsFactory, StubScoring, StubStewardList, UnusedMls};
fn make_handle(steward_list: StubStewardList) -> ConversationHandle<StubPluginsFactory> {
ConversationHandle::new(
Conversation::new("g"),
Some(UnusedMls),
ConversationStateMachine::new_as_member(),
ConversationConfig::default(),
StubScoring,
steward_list,
)
}
#[test]
fn create_commit_candidate_errors_for_non_steward_outside_recovery() {
let mut handle = make_handle(StubStewardList::member());
let err = handle
.create_commit_candidate(b"me", b"app")
.expect_err("non-steward should be rejected");
assert!(matches!(err, CoreError::NotASteward));
}
#[test]
fn create_commit_candidate_errors_when_no_approved_proposals() {
let mut handle = make_handle(StubStewardList::steward());
let err = handle
.create_commit_candidate(b"me", b"app")
.expect_err("empty approved queue should be rejected");
assert!(matches!(err, CoreError::NoProposals));
}
/// An emergency-criteria proposal in the approved queue must surface as
/// `UnexpectedNonMlsProposals` — only MLS-producing payloads belong in a
/// commit. The error carries the offending proposal ids so the
/// orchestrator can drop them.
#[test]
fn create_commit_candidate_errors_on_emergency_in_approved_queue() {
use crate::protos::de_mls::messages::v1::ViolationEvidence;
let mut handle = make_handle(StubStewardList::steward());
let emergency = ViolationEvidence::broken_commit(vec![0xAA], 0, Vec::<u8>::new())
.with_creator(vec![0x01])
.into_update_request()
.unwrap();
handle.conversation.insert_approved_proposal(50, emergency);
let err = handle
.create_commit_candidate(b"me", b"app")
.expect_err("emergency in approved queue should be rejected");
let CoreError::UnexpectedNonMlsProposals { proposal_ids } = err else {
panic!("expected UnexpectedNonMlsProposals, got {err:?}");
};
assert_eq!(proposal_ids, vec![50]);
}
}