Skip to main content

de_mls/core/
process_result.rs

1//! [`ProcessResult`] returned by [`process_inbound`](super::process_inbound)
2//! plus protobuf ↔ message-envelope `From` impls.
3
4use hashgraph_like_consensus::{
5    protos::consensus::v1::{Proposal, Vote},
6    types::ConsensusEvent,
7};
8
9use crate::{
10    core::{CoreError, ScoreEvent, ScoreOp},
11    protos::de_mls::messages::v1::{
12        AppMessage, BanRequest, CommitCandidate, ConversationMessage, ConversationSync,
13        ConversationUpdateRequest, EmergencyCriteriaProposal, InvitationToJoin, Outcome,
14        ProposalAdded, RemoveMember, UserKeyPackage, UserVote, ViolationEvidence, ViolationType,
15        VotePayload, WelcomeMessage, app_message, conversation_update_request, welcome_message,
16    },
17};
18
19/// Outcome of processing one inbound packet. The app layer matches this
20/// directly and dispatches the side effects.
21///
22/// Heavy protobuf payloads (`AppMessage`, `Proposal`, `Vote`,
23/// `ConversationUpdateRequest`, `ConversationSync` — each 88–144 bytes) are
24/// boxed so the enum stays small. Without boxing the enum is sized to its
25/// largest variant and every return / clone / match-move copies ~150 bytes;
26/// with boxing it drops to ~32 bytes.
27#[derive(Debug, Clone)]
28pub enum ProcessResult {
29    /// Decrypted application message ready to deliver to the UI.
30    AppMessage(Box<AppMessage>),
31
32    /// Consensus proposal from a peer — forward to the consensus service.
33    Proposal(Box<Proposal>),
34
35    /// Consensus vote from a peer — forward to the consensus service.
36    Vote(Box<Vote>),
37
38    /// We were removed from the conversation.
39    LeaveConversation,
40
41    /// Steward received a membership change (invite KP / ban) — start a vote.
42    MembershipChangeReceived(Box<ConversationUpdateRequest>),
43
44    /// Successfully joined via a welcome message; carries the conversation name.
45    JoinedConversation(String),
46
47    /// MLS state advanced (batch commit applied).
48    ConversationUpdated,
49
50    /// Remote commit candidate was buffered in the active freeze round.
51    /// `steward` is the wire-claimed identity from the candidate; the app
52    /// layer uses it for dispatch context (logging, scoring attribution).
53    CommitCandidateReceived { steward: Vec<u8> },
54
55    /// Conversation-sync message from the steward (steward list, scores, timing,
56    /// protocol flags). Meaningful only for joiners with no steward list yet.
57    ConversationSyncReceived(Box<ConversationSync>),
58
59    /// Nothing to do. The reason variant names the specific case so the
60    /// app layer can match precisely instead of relying on producer-side
61    /// log lines for context.
62    Noop(NoopReason),
63}
64
65/// Why a [`ProcessResult::Noop`] was returned. One variant per producer
66/// site so the dispatch layer can match on the specific case.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum NoopReason {
69    /// Decrypted application message had no recognized payload variant.
70    UnknownAppMessage,
71    /// Fast-path proposal rejected: MLS sender doesn't match the
72    /// self-removal target.
73    FastPathRejected,
74    /// Ban request dropped: target is not a conversation member.
75    BanTargetNotMember,
76    /// Decrypt returned `Ignored` (wrong epoch or wrong conversation).
77    DecryptIgnored,
78    /// Decrypt returned a non-Application MLS payload on the app subtopic.
79    UnexpectedMlsType,
80    /// `buffer_commit_candidate` found no approved proposals to commit.
81    NoApprovedProposals,
82    /// Commit hash matches a recent committed batch — duplicate broadcast.
83    AlreadyCommitted,
84    /// Candidate carried an empty proposals or commit payload.
85    EmptyCandidatePayload,
86    /// Candidate carried an empty `steward_identity` field.
87    EmptyStewardIdentity,
88    /// Candidate's wire kind doesn't match Proposal/Commit.
89    WireKindMismatch,
90    /// Freeze round selection is locked — the buffer no longer accepts
91    /// candidates.
92    SelectionLocked,
93    /// Caller's epoch doesn't match the buffered round's epoch.
94    StaleEpoch,
95    /// Identical commit hash is already buffered for this round.
96    DuplicateBufferedHash,
97}
98
99// ── ViolationEvidence constructors ────────────────────────────────
100
101impl ViolationEvidence {
102    /// Steward included different proposal IDs than what was voted on,
103    /// or IDs match but content digest differs.
104    pub fn broken_commit(target: Vec<u8>, epoch: u64, payload: impl Into<Vec<u8>>) -> Self {
105        Self {
106            violation_type: ViolationType::BrokenCommit as i32,
107            target_member_id: target,
108            evidence_payload: payload.into(),
109            epoch,
110            creator_member_id: Vec::new(),
111        }
112    }
113
114    /// MLS payload count doesn't match proposal count,
115    /// or an MLS proposal failed to decrypt/store correctly.
116    pub fn broken_mls_proposal(target: Vec<u8>, epoch: u64, payload: impl Into<Vec<u8>>) -> Self {
117        Self {
118            violation_type: ViolationType::BrokenMlsProposal as i32,
119            target_member_id: target,
120            evidence_payload: payload.into(),
121            epoch,
122            creator_member_id: Vec::new(),
123        }
124    }
125
126    /// Steward didn't commit within the threshold duration.
127    pub fn censorship_inactivity(target: Vec<u8>, epoch: u64) -> Self {
128        Self {
129            violation_type: ViolationType::CensorshipInactivity as i32,
130            target_member_id: target,
131            evidence_payload: Vec::new(),
132            epoch,
133            creator_member_id: Vec::new(),
134        }
135    }
136
137    /// Member's peer score dropped to or below the removal threshold.
138    pub fn score_below_threshold(target: Vec<u8>, epoch: u64, current_score: i64) -> Self {
139        Self {
140            violation_type: ViolationType::ScoreBelowThreshold as i32,
141            target_member_id: target,
142            evidence_payload: current_score.to_le_bytes().to_vec(),
143            epoch,
144            creator_member_id: Vec::new(),
145        }
146    }
147
148    /// Layer 3 anti-deadlock signal — on YES the steward gate relaxes so
149    /// any member can produce the recovery commit. No specific target.
150    pub fn deadlock(epoch: u64) -> Self {
151        Self {
152            violation_type: ViolationType::Deadlock as i32,
153            target_member_id: Vec::new(),
154            evidence_payload: Vec::new(),
155            epoch,
156            creator_member_id: Vec::new(),
157        }
158    }
159
160    /// Set the creator identity on this evidence (called by app layer before voting).
161    pub fn with_creator(mut self, creator: Vec<u8>) -> Self {
162        self.creator_member_id = creator;
163        self
164    }
165
166    /// Wrap this evidence into a `ConversationUpdateRequest` for consensus voting.
167    ///
168    /// Returns an error if `creator_member_id` is empty. Call `.with_creator()` before this
169    /// method — every ECP must carry the creator identity for peer scoring (RFC §"Peer Scoring").
170    pub fn into_update_request(self) -> Result<ConversationUpdateRequest, CoreError> {
171        if self.creator_member_id.is_empty() {
172            return Err(CoreError::InvalidConversationUpdateRequest);
173        }
174        Ok(ConversationUpdateRequest {
175            payload: Some(conversation_update_request::Payload::EmergencyCriteria(
176                EmergencyCriteriaProposal {
177                    evidence: Some(self),
178                },
179            )),
180        })
181    }
182
183    /// Peer-score penalty the target takes for this violation, or `None`
184    /// for violation types that have no target-side score (`ScoreBelowThreshold`
185    /// drives a removal, not a penalty; `Deadlock` has no target;
186    /// `Unspecified` and unknown wire values are malformed).
187    pub fn target_score_event(&self) -> Option<ScoreEvent> {
188        match ViolationType::try_from(self.violation_type) {
189            Ok(ViolationType::BrokenCommit) => Some(ScoreEvent::BrokenCommit),
190            Ok(ViolationType::BrokenMlsProposal) => Some(ScoreEvent::BrokenMlsProposal),
191            Ok(ViolationType::CensorshipInactivity) => Some(ScoreEvent::CensorshipInactivity),
192            Ok(ViolationType::ScoreBelowThreshold)
193            | Ok(ViolationType::Deadlock)
194            | Ok(ViolationType::ViolationUnspecified)
195            | Err(_) => None,
196        }
197    }
198
199    /// `ScoreOp` applying [`Self::target_score_event`] to [`Self::target_member_id`].
200    /// `None` when the violation type carries no target-side score.
201    pub fn target_score_op(&self) -> Option<ScoreOp> {
202        Some(ScoreOp {
203            member_id: self.target_member_id.clone(),
204            event: self.target_score_event()?,
205        })
206    }
207}
208
209/// Build `impl From<Inner> for Envelope` where
210/// `Envelope { payload: Some(<variant path>(Inner)) }`.
211macro_rules! impl_payload_from {
212    ($envelope:ty, $( $inner:ty => $variant:path ),+ $(,)?) => {
213        $(
214            impl From<$inner> for $envelope {
215                fn from(value: $inner) -> Self {
216                    Self { payload: Some($variant(value)) }
217                }
218            }
219        )+
220    };
221}
222
223impl_payload_from!(
224    WelcomeMessage,
225    UserKeyPackage   => welcome_message::Payload::UserKeyPackage,
226    InvitationToJoin => welcome_message::Payload::InvitationToJoin,
227);
228
229impl_payload_from!(
230    AppMessage,
231    VotePayload         => app_message::Payload::VotePayload,
232    UserVote            => app_message::Payload::UserVote,
233    ConversationMessage => app_message::Payload::ConversationMessage,
234    CommitCandidate     => app_message::Payload::CommitCandidate,
235    BanRequest          => app_message::Payload::BanRequest,
236    Proposal            => app_message::Payload::Proposal,
237    Vote                => app_message::Payload::Vote,
238    ConversationSync           => app_message::Payload::ConversationSync,
239    ProposalAdded       => app_message::Payload::ProposalAdded,
240);
241
242impl From<ConsensusEvent> for Outcome {
243    fn from(ev: ConsensusEvent) -> Self {
244        match ev {
245            ConsensusEvent::ConsensusReached { result: true, .. } => Outcome::Accepted,
246            ConsensusEvent::ConsensusReached { result: false, .. } => Outcome::Rejected,
247            ConsensusEvent::ConsensusFailed { .. } => Outcome::Unspecified,
248        }
249    }
250}
251
252impl TryFrom<AppMessage> for ProcessResult {
253    type Error = CoreError;
254    fn try_from(value: AppMessage) -> Result<Self, Self::Error> {
255        match &value.payload {
256            Some(app_message::Payload::ConversationMessage(_)) => {
257                Ok(ProcessResult::AppMessage(Box::new(value)))
258            }
259            Some(app_message::Payload::Proposal(proposal)) => {
260                Ok(ProcessResult::Proposal(Box::new(proposal.clone())))
261            }
262            Some(app_message::Payload::Vote(vote)) => {
263                Ok(ProcessResult::Vote(Box::new(vote.clone())))
264            }
265            Some(app_message::Payload::BanRequest(ban_request)) => Ok(
266                ProcessResult::MembershipChangeReceived(Box::new(ConversationUpdateRequest {
267                    payload: Some(conversation_update_request::Payload::RemoveMember(
268                        RemoveMember {
269                            identity: ban_request.user_to_ban.clone(),
270                        },
271                    )),
272                })),
273            ),
274            Some(app_message::Payload::ConversationSync(sync)) => Ok(
275                ProcessResult::ConversationSyncReceived(Box::new(sync.clone())),
276            ),
277            other => {
278                tracing::debug!(
279                    payload_kind = ?other.as_ref().map(std::mem::discriminant),
280                    "app message ignored: payload variant not consumed by core dispatch"
281                );
282                Ok(ProcessResult::Noop(NoopReason::UnknownAppMessage))
283            }
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    /// `ViolationEvidence::broken_commit` plus `with_creator` plus
293    /// `into_update_request` produce an `EmergencyCriteria` payload that
294    /// preserves target, epoch, and violation type on the wire.
295    #[test]
296    fn broken_commit_evidence_roundtrips_into_update_request() {
297        let evidence = ViolationEvidence::broken_commit(vec![0xAA, 0xBB], 5, vec![0xDE, 0xAD])
298            .with_creator(vec![0x01]);
299        let request = evidence.into_update_request().unwrap();
300
301        let Some(conversation_update_request::Payload::EmergencyCriteria(ec)) = request.payload
302        else {
303            panic!("Expected EmergencyCriteria payload");
304        };
305        let ev = ec.evidence.expect("evidence present");
306        assert_eq!(ev.violation_type, ViolationType::BrokenCommit as i32);
307        assert_eq!(ev.target_member_id, vec![0xAA, 0xBB]);
308        assert_eq!(ev.epoch, 5);
309        assert_eq!(ev.evidence_payload, vec![0xDE, 0xAD]);
310        assert_eq!(ev.creator_member_id, vec![0x01]);
311    }
312
313    /// `into_update_request` rejects evidence with no creator identity.
314    #[test]
315    fn into_update_request_errors_without_creator() {
316        let evidence = ViolationEvidence::broken_commit(vec![0xAA], 0, Vec::<u8>::new());
317        let err = evidence
318            .into_update_request()
319            .expect_err("creator required");
320        assert!(matches!(err, CoreError::InvalidConversationUpdateRequest));
321    }
322}