Skip to main content

de_mls/app/user/
inbound.rs

1//! User-side inbound entry point.
2//!
3//! `process_inbound_packet` owns echo dedup + name routing; the welcome
4//! subtopic handler reaches the per-conv plugin factory (`welcome_mls`),
5//! which lives at the User layer. App-message packets are handed off to
6//! [`SessionRunner::dispatch_inbound_result`] for MLS processing and
7//! per-conversation dispatch.
8
9use std::sync::{Arc, RwLock};
10
11use prost::Message;
12use tracing::info;
13
14use crate::{
15    app::{DispatchOutcome, LockExt, SessionRunner, User, UserError},
16    core::{
17        ConsensusPlugin, ConversationLifecycle, ConversationPluginsFactory, CoreError,
18        ProcessResult, StewardListPlugin,
19    },
20    ds::{APP_MSG_SUBTOPIC, InboundPacket, WELCOME_SUBTOPIC},
21    mls_crypto::{MlsService, key_package_bytes_from_json},
22    protos::de_mls::messages::v1::{
23        ConversationUpdateRequest, InviteMember, WelcomeMessage, conversation_update_request,
24        welcome_message,
25    },
26};
27
28impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
29    // ── Public API ───────────────────────────────────────────────────
30
31    /// Process an inbound packet. The User-level entry point owns echo
32    /// dedup, name-based routing, and the welcome subtopic's plug-in-
33    /// factory access. App-message packets are handed off to the session
34    /// for MLS processing and dispatch.
35    pub async fn process_inbound_packet(&self, packet: InboundPacket) -> Result<(), UserError> {
36        let conversation_name = packet.conversation_id.clone();
37
38        // Echo dedup: drop our own messages received back from pub/sub.
39        if packet.app_id.as_slice() == &*self.app_id {
40            return Ok(());
41        }
42
43        let entry_arc = self
44            .lookup_entry(&conversation_name)?
45            .ok_or(UserError::ConversationNotFound)?;
46
47        match packet.subtopic.as_str() {
48            WELCOME_SUBTOPIC => {
49                self.process_welcome_packet(&conversation_name, &packet.payload, &entry_arc)
50                    .await
51            }
52            APP_MSG_SUBTOPIC => {
53                let result = {
54                    let mut entry = entry_arc.write_or_err("session")?;
55                    if entry.handle.mls().is_none() {
56                        return Ok(());
57                    }
58                    entry.handle.process_inbound(&packet.payload)?
59                };
60                self.finish_dispatch(&conversation_name, &entry_arc, result)
61                    .await
62            }
63            other => Err(UserError::Core(CoreError::InvalidSubtopic(
64                other.to_string(),
65            ))),
66        }
67    }
68
69    /// User-side completion of `LeaveConversation`: drop the entry from
70    /// the registry, clean up the consensus scope, and broadcast removal.
71    /// The session-side teardown (emit `Leaving`, delete MLS state) runs
72    /// inside `SessionRunner::dispatch_inbound_result` /
73    /// [`SessionRunner::poll_freeze_status`] /
74    /// [`SessionRunner::check_pending_join`]; this method is the cleanup
75    /// callers run when those signal "registry should be removed"
76    /// (`DispatchOutcome::LeaveRequested` or `PendingJoinTick::Expired`).
77    pub async fn finalize_self_leave(&self, conversation_name: &str) -> Result<(), UserError> {
78        // Cancel auto-vote timers before removing the registry entry —
79        // `cleanup_consensus_scope` finds the runner via `lookup_entry` and
80        // aborts its timers. If the entry is gone first, the lookup returns
81        // `None` and the timers leak (still scheduled, will fire against a
82        // conversation we've left).
83        self.cleanup_consensus_scope(conversation_name).await?;
84        self.conversations
85            .write()
86            .map_err(|_| UserError::LockPoisoned("conversation registry"))?
87            .remove(conversation_name);
88        self.emit_lifecycle(ConversationLifecycle::Removed(
89            conversation_name.to_string(),
90        ));
91        Ok(())
92    }
93
94    // ── Private ──────────────────────────────────────────────────────
95
96    /// Welcome-subtopic dispatch. Two payload kinds:
97    /// - `UserKeyPackage` — a peer wants to join. If we already have an MLS
98    ///   service for this conversation and the candidate isn't a member, surface
99    ///   it as a membership-change request.
100    /// - `InvitationToJoin` — try the welcome factory. If it returns
101    ///   `Some(svc)`, attach to the runner and fire the join flow.
102    async fn process_welcome_packet(
103        &self,
104        conversation_name: &str,
105        payload: &[u8],
106        entry_arc: &Arc<RwLock<SessionRunner<P, CP>>>,
107    ) -> Result<(), UserError> {
108        let welcome_msg = WelcomeMessage::decode(payload)?;
109        match welcome_msg.payload {
110            Some(welcome_message::Payload::UserKeyPackage(user_kp)) => {
111                let (key_package_bytes, identity) =
112                    key_package_bytes_from_json(user_kp.key_package_bytes)?;
113
114                let already_member = {
115                    let entry = entry_arc.read_or_err("session")?;
116                    entry
117                        .handle
118                        .mls()
119                        .map(|m| m.is_member(&identity))
120                        .unwrap_or(false)
121                };
122                if already_member {
123                    info!(
124                        conversation = conversation_name,
125                        identity = ?identity,
126                        "key package skipped: already a member"
127                    );
128                    return Ok(());
129                }
130
131                info!(
132                    conversation = conversation_name,
133                    identity = ?identity,
134                    "key package received"
135                );
136
137                let gur = ConversationUpdateRequest {
138                    payload: Some(conversation_update_request::Payload::InviteMember(
139                        InviteMember {
140                            key_package_bytes,
141                            identity,
142                        },
143                    )),
144                };
145                SessionRunner::handle_incoming_update_request(entry_arc, gur).await
146            }
147            Some(welcome_message::Payload::InvitationToJoin(invitation)) => {
148                let self_id = self.self_identity();
149                let already_in = {
150                    let entry = entry_arc.read_or_err("session")?;
151                    entry.handle.steward_list.is_steward(self_id) || entry.handle.mls().is_some()
152                };
153                if already_in {
154                    return Ok(());
155                }
156
157                let svc = self
158                    .plugins
159                    .conversation_plugins
160                    .welcome_mls(&invitation.mls_message_out_bytes)?;
161                let Some(svc) = svc else {
162                    // Welcome wasn't for us.
163                    return Ok(());
164                };
165                let joined_name = svc.conversation_id().to_string();
166                {
167                    let mut entry = entry_arc.write_or_err("session")?;
168                    entry.handle.attach_mls(svc);
169                }
170                info!(
171                    conversation = conversation_name,
172                    "joined conversation via welcome"
173                );
174                self.finish_dispatch(
175                    conversation_name,
176                    entry_arc,
177                    ProcessResult::JoinedConversation(joined_name),
178                )
179                .await
180            }
181            None => Ok(()),
182        }
183    }
184
185    /// Drive the session-side dispatcher and finish lifecycle work on the
186    /// User side when the session signals `LeaveRequested`.
187    async fn finish_dispatch(
188        &self,
189        conversation_name: &str,
190        entry_arc: &Arc<RwLock<SessionRunner<P, CP>>>,
191        result: ProcessResult,
192    ) -> Result<(), UserError> {
193        let outcome = SessionRunner::dispatch_inbound_result(entry_arc, result).await?;
194        if matches!(outcome, DispatchOutcome::LeaveRequested) {
195            self.finalize_self_leave(conversation_name).await?;
196        }
197        Ok(())
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use std::sync::Mutex;
205    use std::time::Duration;
206
207    use crate::ds::{DeliveryService, DeliveryServiceError, OutboundPacket, SharedDeliveryService};
208    use crate::test_fixtures::make_user_from_private_key;
209
210    /// Transport stub: `publish` is a no-op so an outbound never reaches a
211    /// real network; `subscribe` is a no-op too.
212    #[derive(Debug)]
213    struct NullTransport;
214    impl DeliveryService for NullTransport {
215        type Error = DeliveryServiceError;
216
217        fn publish(&mut self, _: OutboundPacket) -> Result<(), Self::Error> {
218            Ok(())
219        }
220
221        fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
222            Ok(())
223        }
224    }
225
226    const ALICE_KEY: &str = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
227
228    /// Self-leave must abort auto-vote timers — otherwise they fire
229    /// against a conversation we've left.
230    /// Self-leave must drop the pending auto-vote registry — otherwise
231    /// the next `tick_deadlines` would fire against a conversation
232    /// we've left.
233    #[tokio::test]
234    async fn finalize_self_leave_clears_pending_auto_votes() {
235        let transport: SharedDeliveryService = Arc::new(Mutex::new(NullTransport));
236        let mut user = make_user_from_private_key(ALICE_KEY, transport);
237        user.start_conversation("test-conv", true).await.unwrap();
238
239        let session = user
240            .lookup_entry("test-conv")
241            .unwrap()
242            .expect("creator session registered");
243
244        // Seed a pending auto-vote with a far-future fire-at so the
245        // assertion isn't sensitive to wall-clock drift.
246        session
247            .write()
248            .unwrap()
249            .register_auto_vote(42, Duration::from_secs(600), true);
250        assert!(
251            session.read().unwrap().pending_auto_votes.contains_key(&42),
252            "auto-vote must be registered before self-leave"
253        );
254
255        user.finalize_self_leave("test-conv").await.unwrap();
256
257        // Session entry is gone from the registry, so the conversation's
258        // pending auto-votes can no longer fire from a poll cycle on this
259        // user.
260        assert!(
261            user.lookup_entry("test-conv").unwrap().is_none(),
262            "registry entry must be evicted on self-leave"
263        );
264    }
265}