de_mls/app/user/
inbound.rs1use std::sync::{Arc, RwLock};
10
11use prost::Message;
12use tracing::info;
13
14use crate::{
15 app::{DispatchOutcome, LockExt, SessionRunner, User, UserError},
16 core::{
17 ConsensusPlugin, ConversationLifecycle, ConversationPluginsFactory, CoreError,
18 ProcessResult, StewardListPlugin,
19 },
20 ds::{APP_MSG_SUBTOPIC, InboundPacket, WELCOME_SUBTOPIC},
21 mls_crypto::{MlsService, key_package_bytes_from_json},
22 protos::de_mls::messages::v1::{
23 ConversationUpdateRequest, InviteMember, WelcomeMessage, conversation_update_request,
24 welcome_message,
25 },
26};
27
28impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> User<P, CP> {
29 pub async fn process_inbound_packet(&self, packet: InboundPacket) -> Result<(), UserError> {
36 let conversation_name = packet.conversation_id.clone();
37
38 if packet.app_id.as_slice() == &*self.app_id {
40 return Ok(());
41 }
42
43 let entry_arc = self
44 .lookup_entry(&conversation_name)?
45 .ok_or(UserError::ConversationNotFound)?;
46
47 match packet.subtopic.as_str() {
48 WELCOME_SUBTOPIC => {
49 self.process_welcome_packet(&conversation_name, &packet.payload, &entry_arc)
50 .await
51 }
52 APP_MSG_SUBTOPIC => {
53 let result = {
54 let mut entry = entry_arc.write_or_err("session")?;
55 if entry.handle.mls().is_none() {
56 return Ok(());
57 }
58 entry.handle.process_inbound(&packet.payload)?
59 };
60 self.finish_dispatch(&conversation_name, &entry_arc, result)
61 .await
62 }
63 other => Err(UserError::Core(CoreError::InvalidSubtopic(
64 other.to_string(),
65 ))),
66 }
67 }
68
69 pub async fn finalize_self_leave(&self, conversation_name: &str) -> Result<(), UserError> {
78 self.cleanup_consensus_scope(conversation_name).await?;
84 self.conversations
85 .write()
86 .map_err(|_| UserError::LockPoisoned("conversation registry"))?
87 .remove(conversation_name);
88 self.emit_lifecycle(ConversationLifecycle::Removed(
89 conversation_name.to_string(),
90 ));
91 Ok(())
92 }
93
94 async fn process_welcome_packet(
103 &self,
104 conversation_name: &str,
105 payload: &[u8],
106 entry_arc: &Arc<RwLock<SessionRunner<P, CP>>>,
107 ) -> Result<(), UserError> {
108 let welcome_msg = WelcomeMessage::decode(payload)?;
109 match welcome_msg.payload {
110 Some(welcome_message::Payload::UserKeyPackage(user_kp)) => {
111 let (key_package_bytes, identity) =
112 key_package_bytes_from_json(user_kp.key_package_bytes)?;
113
114 let already_member = {
115 let entry = entry_arc.read_or_err("session")?;
116 entry
117 .handle
118 .mls()
119 .map(|m| m.is_member(&identity))
120 .unwrap_or(false)
121 };
122 if already_member {
123 info!(
124 conversation = conversation_name,
125 identity = ?identity,
126 "key package skipped: already a member"
127 );
128 return Ok(());
129 }
130
131 info!(
132 conversation = conversation_name,
133 identity = ?identity,
134 "key package received"
135 );
136
137 let gur = ConversationUpdateRequest {
138 payload: Some(conversation_update_request::Payload::InviteMember(
139 InviteMember {
140 key_package_bytes,
141 identity,
142 },
143 )),
144 };
145 SessionRunner::handle_incoming_update_request(entry_arc, gur).await
146 }
147 Some(welcome_message::Payload::InvitationToJoin(invitation)) => {
148 let self_id = self.self_identity();
149 let already_in = {
150 let entry = entry_arc.read_or_err("session")?;
151 entry.handle.steward_list.is_steward(self_id) || entry.handle.mls().is_some()
152 };
153 if already_in {
154 return Ok(());
155 }
156
157 let svc = self
158 .plugins
159 .conversation_plugins
160 .welcome_mls(&invitation.mls_message_out_bytes)?;
161 let Some(svc) = svc else {
162 return Ok(());
164 };
165 let joined_name = svc.conversation_id().to_string();
166 {
167 let mut entry = entry_arc.write_or_err("session")?;
168 entry.handle.attach_mls(svc);
169 }
170 info!(
171 conversation = conversation_name,
172 "joined conversation via welcome"
173 );
174 self.finish_dispatch(
175 conversation_name,
176 entry_arc,
177 ProcessResult::JoinedConversation(joined_name),
178 )
179 .await
180 }
181 None => Ok(()),
182 }
183 }
184
185 async fn finish_dispatch(
188 &self,
189 conversation_name: &str,
190 entry_arc: &Arc<RwLock<SessionRunner<P, CP>>>,
191 result: ProcessResult,
192 ) -> Result<(), UserError> {
193 let outcome = SessionRunner::dispatch_inbound_result(entry_arc, result).await?;
194 if matches!(outcome, DispatchOutcome::LeaveRequested) {
195 self.finalize_self_leave(conversation_name).await?;
196 }
197 Ok(())
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use std::sync::Mutex;
205 use std::time::Duration;
206
207 use crate::ds::{DeliveryService, DeliveryServiceError, OutboundPacket, SharedDeliveryService};
208 use crate::test_fixtures::make_user_from_private_key;
209
210 #[derive(Debug)]
213 struct NullTransport;
214 impl DeliveryService for NullTransport {
215 type Error = DeliveryServiceError;
216
217 fn publish(&mut self, _: OutboundPacket) -> Result<(), Self::Error> {
218 Ok(())
219 }
220
221 fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
222 Ok(())
223 }
224 }
225
226 const ALICE_KEY: &str = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80";
227
228 #[tokio::test]
234 async fn finalize_self_leave_clears_pending_auto_votes() {
235 let transport: SharedDeliveryService = Arc::new(Mutex::new(NullTransport));
236 let mut user = make_user_from_private_key(ALICE_KEY, transport);
237 user.start_conversation("test-conv", true).await.unwrap();
238
239 let session = user
240 .lookup_entry("test-conv")
241 .unwrap()
242 .expect("creator session registered");
243
244 session
247 .write()
248 .unwrap()
249 .register_auto_vote(42, Duration::from_secs(600), true);
250 assert!(
251 session.read().unwrap().pending_auto_votes.contains_key(&42),
252 "auto-vote must be registered before self-leave"
253 );
254
255 user.finalize_self_leave("test-conv").await.unwrap();
256
257 assert!(
261 user.lookup_entry("test-conv").unwrap().is_none(),
262 "registry entry must be evicted on self-leave"
263 );
264 }
265}