Skip to main content

de_mls/app/
user.rs

1//! User struct for managing multiple groups.
2//!
3//! This is the main entry point for the application layer,
4//! managing multiple `GroupHandle`s and coordinating operations.
5
6use alloy::signers::local::{LocalSignerError, PrivateKeySigner};
7use std::{collections::HashMap, str::FromStr, sync::Arc};
8use tokio::sync::RwLock;
9use tracing::{error, info};
10
11use hashgraph_like_consensus::{service::DefaultConsensusService, types::ConsensusEvent};
12
13use crate::app::state_machine::{
14    CommitTimeoutStatus, GroupConfig, GroupState, GroupStateMachine, StateChangeHandler,
15};
16use crate::core::{
17    self, CoreError, DeMlsProvider, DefaultProvider, GroupEventHandler, GroupHandle,
18    create_batch_proposals,
19};
20use crate::ds::InboundPacket;
21use crate::mls_crypto::{
22    IdentityError, MemoryDeMlsStorage, MlsService, format_wallet_address, parse_wallet_to_bytes,
23};
24use crate::protos::de_mls::messages::v1::{
25    AppMessage, BanRequest, ConversationMessage, GroupUpdateRequest, RemoveMember,
26    group_update_request,
27};
28
29/// Internal state for a group managed by User.
30struct GroupEntry {
31    handle: GroupHandle,
32    state_machine: GroupStateMachine,
33}
34
35/// User manages multiple MLS groups.
36///
37/// This struct provides the main application-level interface for
38/// working with MLS groups, handling consensus, and processing messages.
39///
40/// The type parameter `P` determines which service implementations are used
41/// (storage, consensus). Use [`DefaultProvider`] for standard configuration.
42///
43/// The type parameter `H` is the handler that receives output events
44/// (outbound packets, app messages, leave/join notifications).
45///
46/// The type parameter `SCH` is the handler for state machine state changes
47/// (an app-layer concern, separate from the core `GroupEventHandler`).
48pub struct User<P: DeMlsProvider, H: GroupEventHandler, SCH: StateChangeHandler> {
49    mls_service: MlsService<P::Storage>,
50    groups: Arc<RwLock<HashMap<String, GroupEntry>>>,
51    consensus_service: Arc<P::Consensus>,
52    eth_signer: PrivateKeySigner,
53    handler: Arc<H>,
54    state_handler: Arc<SCH>,
55    /// Default config for new groups (can be overridden per-group).
56    default_group_config: GroupConfig,
57}
58
59impl<P: DeMlsProvider, H: GroupEventHandler + 'static, SCH: StateChangeHandler + 'static>
60    User<P, H, SCH>
61{
62    /// Create a new User instance with pre-built services and custom default group config.
63    ///
64    /// # Arguments
65    /// * `mls_service` - MLS service for cryptographic operations
66    /// * `consensus_service` - Consensus service
67    /// * `eth_signer` - Ethereum signer for voting
68    /// * `handler` - Event handler for output events
69    /// * `state_handler` - Handler for state machine state changes
70    /// * `default_group_config` - Default config applied to new groups
71    fn new_with_config(
72        mls_service: MlsService<P::Storage>,
73        consensus_service: Arc<P::Consensus>,
74        eth_signer: PrivateKeySigner,
75        handler: Arc<H>,
76        state_handler: Arc<SCH>,
77        default_group_config: GroupConfig,
78    ) -> Self {
79        Self {
80            mls_service,
81            groups: Arc::new(RwLock::new(HashMap::new())),
82            consensus_service,
83            eth_signer,
84            handler,
85            state_handler,
86            default_group_config,
87        }
88    }
89
90    /// Get the user's identity string (wallet address as checksummed hex).
91    pub fn identity_string(&self) -> String {
92        self.mls_service.wallet_hex()
93    }
94
95    // ─────────────────────────── Group Management ───────────────────────────
96
97    /// Create or join a group with the user's default config.
98    ///
99    /// # Arguments
100    /// * `group_name` - The name of the group
101    /// * `is_creation` - `true` to create a new group as steward, `false` to prepare to join
102    pub async fn create_group(
103        &mut self,
104        group_name: &str,
105        is_creation: bool,
106    ) -> Result<(), UserError> {
107        self.create_group_with_config(group_name, is_creation, self.default_group_config.clone())
108            .await
109    }
110
111    /// Create or join a group with custom config.
112    ///
113    /// # Arguments
114    /// * `group_name` - The name of the group
115    /// * `is_creation` - `true` to create a new group as steward, `false` to prepare to join
116    /// * `config` - Group-specific configuration
117    pub async fn create_group_with_config(
118        &mut self,
119        group_name: &str,
120        is_creation: bool,
121        config: GroupConfig,
122    ) -> Result<(), UserError> {
123        let mut groups = self.groups.write().await;
124        if groups.contains_key(group_name) {
125            return Err(UserError::GroupAlreadyExists);
126        }
127
128        let (handle, state_machine) = if is_creation {
129            let handle = core::create_group(group_name, &self.mls_service)?;
130            let state_machine = GroupStateMachine::new_as_steward_with_config(config);
131            (handle, state_machine)
132        } else {
133            let handle = core::prepare_to_join(group_name);
134            let state_machine = GroupStateMachine::new_as_pending_join_with_config(config);
135            (handle, state_machine)
136        };
137
138        let initial_state = state_machine.current_state();
139        groups.insert(
140            group_name.to_string(),
141            GroupEntry {
142                handle,
143                state_machine,
144            },
145        );
146        drop(groups);
147
148        self.state_handler
149            .on_state_changed(group_name, initial_state)
150            .await;
151
152        Ok(())
153    }
154
155    /// Leave a group.
156    ///
157    /// For `PendingJoin` state: immediate cleanup (no MLS state exists).
158    /// For `Leaving` state: error (already leaving).
159    /// For `Working`/`Waiting`: transitions to `Leaving` and sends a self-removal
160    /// ban request. Actual cleanup happens when the removal commit arrives
161    /// via `DispatchAction::LeaveGroup`.
162    pub async fn leave_group(&mut self, group_name: &str) -> Result<(), UserError> {
163        info!("[leave_group]: Leaving group {group_name}");
164
165        let (old_state, new_state) = {
166            let mut groups = self.groups.write().await;
167            let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
168            let old_state = entry.state_machine.current_state();
169            match old_state {
170                GroupState::PendingJoin => {
171                    // No MLS state — immediate cleanup
172                    groups.remove(group_name);
173                    drop(groups);
174                    self.handler.on_leave_group(group_name).await?;
175                    return Ok(());
176                }
177                GroupState::Leaving => return Err(UserError::AlreadyLeaving),
178                _ => {
179                    entry.state_machine.start_leaving();
180                }
181            }
182            (old_state, entry.state_machine.current_state())
183        };
184
185        // Notify UI of state change
186        self.state_handler
187            .on_state_changed(group_name, new_state.clone())
188            .await;
189
190        info!(
191            "[leave_group]: Transitioning from {old_state} to Leaving, sending self-removal for group {group_name}"
192        );
193
194        // Send self-removal directly (bypass process_ban_request's Working-state guard,
195        // since we intentionally set Leaving before submitting the removal request).
196        self.start_voting_on_request_background(
197            group_name.to_string(),
198            GroupUpdateRequest {
199                payload: Some(group_update_request::Payload::RemoveMember(RemoveMember {
200                    identity: parse_wallet_to_bytes(&self.identity_string())?,
201                })),
202            },
203        )
204        .await?;
205        Ok(())
206    }
207
208    /// Get the state of a group.
209    pub async fn get_group_state(&self, group_name: &str) -> Result<GroupState, UserError> {
210        let groups = self.groups.read().await;
211        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
212        Ok(entry.state_machine.current_state())
213    }
214
215    /// List all group names.
216    pub async fn list_groups(&self) -> Vec<String> {
217        let groups = self.groups.read().await;
218        groups.keys().cloned().collect()
219    }
220
221    /// Check if the user is steward for a group.
222    pub async fn is_steward_for_group(&self, group_name: &str) -> Result<bool, UserError> {
223        let groups = self.groups.read().await;
224        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
225        Ok(entry.state_machine.is_steward())
226    }
227
228    /// Get the members of a group.
229    pub async fn get_group_members(&self, group_name: &str) -> Result<Vec<String>, UserError> {
230        let groups = self.groups.read().await;
231        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
232
233        if !entry.handle.is_mls_initialized() {
234            return Ok(Vec::new());
235        }
236
237        let members = core::group_members(&entry.handle, &self.mls_service)?;
238        Ok(members
239            .into_iter()
240            .map(|raw| format_wallet_address(raw.as_slice()).to_string())
241            .collect())
242    }
243
244    /// Get current epoch proposals for a group.
245    pub async fn get_approved_proposal_for_current_epoch(
246        &self,
247        group_name: &str,
248    ) -> Result<Vec<GroupUpdateRequest>, UserError> {
249        let groups = self.groups.read().await;
250        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
251        let approved_proposals = core::approved_proposals(&entry.handle);
252        let display_proposals: Vec<GroupUpdateRequest> = approved_proposals.into_values().collect();
253        Ok(display_proposals)
254    }
255
256    /// Get epoch history for a group (past batches of approved proposals, most recent last).
257    ///
258    /// Returns up to the last 10 epoch batches for UI display.
259    pub async fn get_epoch_history(
260        &self,
261        group_name: &str,
262    ) -> Result<Vec<Vec<GroupUpdateRequest>>, UserError> {
263        let groups = self.groups.read().await;
264        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
265        let history = core::epoch_history(&entry.handle);
266        Ok(history
267            .iter()
268            .map(|batch| batch.values().cloned().collect())
269            .collect())
270    }
271
272    // ─────────────────────────── Messaging ───────────────────────────
273
274    /// Build and send a key package message for a group via the handler.
275    pub async fn send_kp_message(&self, group_name: &str) -> Result<(), UserError> {
276        let groups = self.groups.read().await;
277        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
278        let packet = core::build_key_package_message(&entry.handle, &self.mls_service)?;
279        self.handler.on_outbound(group_name, packet).await?;
280        Ok(())
281    }
282
283    /// Send a conversation message to a group.
284    ///
285    /// Allowed in `Working` and `Leaving` states (user is still a group member).
286    /// Blocked in `PendingJoin` (no MLS state) and `Waiting` (epoch freeze).
287    pub async fn send_app_message(
288        &self,
289        group_name: &str,
290        message: Vec<u8>,
291    ) -> Result<(), UserError> {
292        let groups = self.groups.read().await;
293        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
294
295        // Check if group is in a state where sending is allowed
296        let state = entry.state_machine.current_state();
297        if state == GroupState::PendingJoin || state == GroupState::Waiting {
298            return Err(UserError::GroupBlocked(state.to_string()));
299        }
300
301        let app_msg: AppMessage = ConversationMessage {
302            message,
303            sender: self.identity_string(),
304            group_name: group_name.to_string(),
305        }
306        .into();
307
308        let packet = core::build_message(&entry.handle, &self.mls_service, &app_msg)?;
309        self.handler.on_outbound(group_name, packet).await?;
310        Ok(())
311    }
312
313    /// Process a ban request.
314    ///
315    /// Returns an error if the group is blocked (PendingJoin, Waiting, or Leaving state).
316    pub async fn process_ban_request(
317        &mut self,
318        ban_request: BanRequest,
319        group_name: &str,
320    ) -> Result<(), UserError> {
321        // Check if group is in a state where operations are allowed
322        {
323            let groups = self.groups.read().await;
324            let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
325            let state = entry.state_machine.current_state();
326            if state != GroupState::Working {
327                return Err(UserError::GroupBlocked(state.to_string()));
328            }
329        }
330
331        self.start_voting_on_request_background(
332            group_name.to_string(),
333            GroupUpdateRequest {
334                payload: Some(group_update_request::Payload::RemoveMember(RemoveMember {
335                    identity: parse_wallet_to_bytes(ban_request.user_to_ban.as_str())?,
336                })),
337            },
338        )
339        .await?;
340
341        Ok(())
342    }
343
344    // ─────────────────────────── Steward Operations ───────────────────────────
345
346    /// Check if still in pending join state.
347    ///
348    /// Called periodically while in PendingJoin state to:
349    /// 1. Detect when the member has joined (state changed to Working)
350    /// 2. Check for timeout (time-based fallback if group is quiet after rejection)
351    ///
352    /// Returns `true` if still waiting (PendingJoin), `false` if no longer pending
353    /// (either joined, timed out, or group not found).
354    pub async fn check_pending_join(&self, group_name: &str) -> bool {
355        // First check current state
356        let (state, expired) = {
357            let groups = self.groups.read().await;
358            match groups.get(group_name) {
359                Some(entry) => (
360                    entry.state_machine.current_state(),
361                    entry.state_machine.is_pending_join_expired(),
362                ),
363                None => return false, // Group already removed
364            }
365        };
366
367        // If not in PendingJoin, we're done (either joined or left)
368        if state != GroupState::PendingJoin {
369            return false;
370        }
371
372        // Check timeout (time-based fallback)
373        if expired {
374            info!(
375                "[check_pending_join]: Join timed out for group {group_name} \
376                 (time-based fallback)"
377            );
378            self.groups.write().await.remove(group_name);
379            let _ = self.handler.on_leave_group(group_name).await;
380            return false;
381        }
382
383        true // Still waiting
384    }
385
386    /// Get the time until the next epoch boundary for a group.
387    ///
388    /// Returns `None` if the group doesn't exist or hasn't synced yet.
389    /// Returns `Some(Duration::ZERO)` if we're already past the boundary.
390    pub async fn time_until_next_epoch(&self, group_name: &str) -> Option<std::time::Duration> {
391        let groups = self.groups.read().await;
392        let entry = groups.get(group_name)?;
393        entry.state_machine.time_until_next_boundary()
394    }
395
396    /// Check if the commit has timed out while in Waiting state.
397    ///
398    /// Returns a [`CommitTimeoutStatus`] indicating:
399    /// - `NotWaiting` — not in Waiting state (nothing to check)
400    /// - `StillWaiting` — in Waiting but timeout not reached yet
401    /// - `TimedOut { has_proposals }` — timeout reached, state reverted to Working
402    ///
403    /// When timed out, checks if approved proposals still exist:
404    /// - If proposals exist: steward failed to commit (steward fault)
405    /// - If no proposals: false alarm (proposals cleared by other means)
406    ///
407    /// In both cases the member is unblocked (reverted to Working) and the
408    /// epoch boundary is re-synced to now.
409    pub async fn check_commit_timeout(&self, group_name: &str) -> CommitTimeoutStatus {
410        let has_proposals = {
411            let mut groups = self.groups.write().await;
412            let entry = match groups.get_mut(group_name) {
413                Some(e) => e,
414                None => return CommitTimeoutStatus::NotWaiting,
415            };
416
417            if entry.state_machine.current_state() != GroupState::Waiting {
418                return CommitTimeoutStatus::NotWaiting;
419            }
420            if !entry.state_machine.is_commit_timed_out() {
421                return CommitTimeoutStatus::StillWaiting;
422            }
423
424            let has_proposals = core::approved_proposals_count(&entry.handle) > 0;
425
426            if has_proposals {
427                // Steward fault: failed to commit pending proposals.
428                // Request re-election (clears proposals to break the timeout loop).
429                if let Err(e) = core::request_steward_reelection::<P>(
430                    &mut entry.handle,
431                    group_name,
432                    &*self.consensus_service,
433                    &*self.handler,
434                )
435                .await
436                {
437                    error!("[check_commit_timeout] Failed to request steward re-election: {e}");
438                }
439            }
440
441            entry.state_machine.sync_epoch_boundary();
442            entry.state_machine.start_working();
443            has_proposals
444        };
445
446        self.state_handler
447            .on_state_changed(group_name, GroupState::Working)
448            .await;
449
450        CommitTimeoutStatus::TimedOut { has_proposals }
451    }
452
453    /// Start a member epoch check.
454    ///
455    /// Non-steward members call this at the epoch boundary (not before).
456    /// If they have approved proposals, they transition to Waiting state expecting a commit.
457    ///
458    /// Returns `true` if entered Waiting state (caller should poll for commit timeout),
459    /// `false` otherwise (no polling needed).
460    ///
461    /// This method does nothing for stewards or members in PendingJoin/Leaving state.
462    pub async fn start_member_epoch(&self, group_name: &str) -> Result<bool, UserError> {
463        let mut groups = self.groups.write().await;
464        let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
465
466        // Stewards manage their own epoch
467        if entry.state_machine.is_steward() {
468            return Ok(false);
469        }
470
471        // Skip if not yet joined or leaving
472        let state = entry.state_machine.current_state();
473        if state == GroupState::PendingJoin || state == GroupState::Leaving {
474            return Ok(false);
475        }
476
477        // Check if we've reached the epoch boundary
478        let proposal_count = core::approved_proposals_count(&entry.handle);
479        let entered_waiting = entry.state_machine.check_epoch_boundary(proposal_count);
480
481        if entered_waiting {
482            let new_state = entry.state_machine.current_state();
483            info!(
484                "[start_member_epoch]: Entered Waiting at epoch boundary for group {group_name} \
485                 ({proposal_count} approved proposals)"
486            );
487            drop(groups);
488            self.state_handler
489                .on_state_changed(group_name, new_state.clone())
490                .await;
491        }
492
493        Ok(entered_waiting)
494    }
495
496    /// Start a steward epoch.
497    pub async fn start_steward_epoch(&mut self, group_name: &str) -> Result<(), UserError> {
498        // Check if there are proposals to commit
499        let has_proposals = {
500            let groups = self.groups.read().await;
501            let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
502            core::approved_proposals_count(&entry.handle) > 0
503        };
504
505        if !has_proposals {
506            return Ok(());
507        }
508
509        // Transition to Waiting and notify UI before creating batch
510        {
511            let mut groups = self.groups.write().await;
512            let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
513            entry.state_machine.start_steward_epoch()?;
514        }
515        self.state_handler
516            .on_state_changed(group_name, GroupState::Waiting)
517            .await;
518
519        // Create and send batch proposals (group is blocked during this)
520        let messages = {
521            let mut groups = self.groups.write().await;
522            let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
523            create_batch_proposals(&mut entry.handle, &self.mls_service)?
524        };
525
526        // TODO: here can be a deadlock if the handler return error, because steward stay in the waiting state
527        for message in messages {
528            self.handler.on_outbound(group_name, message).await?;
529        }
530
531        // Transition back to Working and notify
532        {
533            let mut groups = self.groups.write().await;
534            if let Some(entry) = groups.get_mut(group_name) {
535                entry.state_machine.start_working();
536            }
537        }
538        self.state_handler
539            .on_state_changed(group_name, GroupState::Working)
540            .await;
541
542        Ok(())
543    }
544
545    pub async fn start_voting_on_request_background(
546        &self,
547        group_name: String,
548        upd_request: GroupUpdateRequest,
549    ) -> Result<(), UserError> {
550        let expected_voters = {
551            let groups = self.groups.read().await;
552            let entry = groups.get(&group_name).ok_or(UserError::GroupNotFound)?;
553            let members = core::group_members(&entry.handle, &self.mls_service)?;
554            members.len() as u32
555        };
556        let identity_string = self.mls_service.wallet_hex();
557
558        let consensus = Arc::clone(&self.consensus_service);
559        let groups = Arc::clone(&self.groups);
560        let handler = Arc::clone(&self.handler);
561        let group_name_clone = group_name.clone();
562
563        tokio::spawn(async move {
564            let result: Result<(), CoreError> = async {
565                let proposal_id = core::start_voting::<P>(
566                    &group_name,
567                    &upd_request,
568                    expected_voters,
569                    identity_string,
570                    &*consensus,
571                    &*handler,
572                )
573                .await?;
574
575                {
576                    let mut groups = groups.write().await;
577                    if let Some(entry) = groups.get_mut(&group_name) {
578                        entry
579                            .handle
580                            .store_voting_proposal(proposal_id, upd_request);
581                    } else {
582                        error!(
583                            "[start_voting_on_request]: Group {group_name} missing during proposal store"
584                        );
585                    }
586                }
587
588                info!(
589                    "[start_voting_on_request]: Stored voting proposal: {proposal_id}"
590                );
591
592                Ok(())
593            }
594            .await;
595
596            if let Err(err) = result {
597                error!("[start_voting_on_request]: background task failed: {err}");
598                handler
599                    .on_error(&group_name_clone, "Start voting", &err.to_string())
600                    .await;
601            }
602        });
603
604        Ok(())
605    }
606
607    // ─────────────────────────── Voting ───────────────────────────
608
609    /// Process a user vote.
610    ///
611    /// Allowed in `Working`, `Leaving`, and `PendingJoin` states.
612    /// Blocked in `Waiting` state (epoch freeze — vote is sent as MLS message).
613    pub async fn process_user_vote(
614        &mut self,
615        group_name: &str,
616        proposal_id: u32,
617        vote: bool,
618    ) -> Result<(), UserError> {
619        let groups = self.groups.read().await;
620        let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
621
622        // Block voting during Waiting state (epoch freeze)
623        let state = entry.state_machine.current_state();
624        if state == GroupState::Waiting {
625            return Err(UserError::GroupBlocked(state.to_string()));
626        }
627
628        let handle = entry.handle.clone();
629        drop(groups);
630
631        core::cast_vote::<P, _, _>(
632            &handle,
633            group_name,
634            proposal_id,
635            vote,
636            &*self.consensus_service,
637            self.eth_signer.clone(),
638            &self.mls_service,
639            &*self.handler,
640        )
641        .await?;
642        Ok(())
643    }
644
645    // ─────────────────────────── Inbound Processing ───────────────────────────
646
647    /// Process an inbound packet.
648    pub async fn process_inbound_packet(&self, packet: InboundPacket) -> Result<(), UserError> {
649        let group_name = packet.group_id.clone();
650
651        // Check if message is from same app instance
652        {
653            let groups = self.groups.read().await;
654            if let Some(entry) = groups.get(&group_name) {
655                if packet.app_id == entry.handle.app_id() {
656                    return Ok(());
657                }
658            } else {
659                return Err(UserError::GroupNotFound);
660            }
661        }
662
663        // Process the packet
664        let (result, handle) = {
665            let mut groups = self.groups.write().await;
666            let entry = groups
667                .get_mut(&group_name)
668                .ok_or(UserError::GroupNotFound)?;
669
670            let result = core::process_inbound(
671                &mut entry.handle,
672                &packet.payload,
673                &packet.subtopic,
674                &self.mls_service,
675            )?;
676
677            (result, entry.handle.clone())
678        };
679
680        let action = core::dispatch_result::<P, _>(
681            &handle,
682            &group_name,
683            result,
684            &*self.consensus_service,
685            &*self.handler,
686            &self.mls_service,
687        )
688        .await?;
689
690        match action {
691            core::DispatchAction::StartVoting(request) => {
692                self.start_voting_on_request_background(group_name, request)
693                    .await?;
694            }
695            core::DispatchAction::GroupUpdated => {
696                // Batch commit received — sync epoch boundary and transition to Working.
697                // Skip if in PendingJoin or Leaving (those states have their own flows).
698                let transitioned = {
699                    let mut groups = self.groups.write().await;
700                    if let Some(entry) = groups.get_mut(&group_name) {
701                        let state = entry.state_machine.current_state();
702                        if state == GroupState::Working || state == GroupState::Waiting {
703                            entry.state_machine.sync_epoch_boundary();
704                            entry.state_machine.start_working();
705                            true
706                        } else {
707                            false
708                        }
709                    } else {
710                        false
711                    }
712                };
713
714                if transitioned {
715                    self.state_handler
716                        .on_state_changed(&group_name, GroupState::Working)
717                        .await;
718                }
719            }
720            core::DispatchAction::LeaveGroup => {
721                self.groups.write().await.remove(&group_name);
722                self.handler.on_leave_group(&group_name).await?;
723            }
724            core::DispatchAction::JoinedGroup => {
725                // Welcome received and joined - sync epoch boundary and transition to Working
726                let state = {
727                    let mut groups = self.groups.write().await;
728                    if let Some(entry) = groups.get_mut(&group_name) {
729                        entry.state_machine.sync_epoch_boundary();
730                        entry.state_machine.start_working();
731                        Some(entry.state_machine.current_state())
732                    } else {
733                        None
734                    }
735                };
736                if let Some(state) = state {
737                    self.state_handler
738                        .on_state_changed(&group_name, state)
739                        .await;
740                }
741            }
742            core::DispatchAction::Done => {}
743        }
744        Ok(())
745    }
746
747    // ─────────────────────────── Consensus Events ───────────────────────────
748
749    /// Handle a consensus event.
750    pub async fn handle_consensus_event(
751        &mut self,
752        group_name: &str,
753        event: ConsensusEvent,
754    ) -> Result<(), UserError> {
755        let mut groups = self.groups.write().await;
756        let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
757
758        core::handle_consensus_event::<P>(
759            &mut entry.handle,
760            group_name,
761            event,
762            &*self.consensus_service,
763        )
764        .await?;
765
766        Ok(())
767    }
768}
769
770// ─────────────────────────── DefaultProvider Convenience ───────────────────────────
771
772impl<H: GroupEventHandler + 'static, SCH: StateChangeHandler + 'static>
773    User<DefaultProvider, H, SCH>
774{
775    /// Convenience constructor for the default provider with default group config.
776    ///
777    /// Creates a User with MLS service and the given consensus service.
778    ///
779    /// # Arguments
780    /// * `private_key` - Ethereum private key as hex string
781    /// * `consensus_service` - The default consensus service
782    /// * `handler` - Event handler for output events
783    /// * `state_handler` - Handler for state machine state changes
784    pub fn with_private_key(
785        private_key: &str,
786        consensus_service: Arc<DefaultConsensusService>,
787        handler: Arc<H>,
788        state_handler: Arc<SCH>,
789    ) -> Result<Self, UserError> {
790        Self::with_private_key_and_config(
791            private_key,
792            consensus_service,
793            handler,
794            state_handler,
795            GroupConfig::default(),
796        )
797    }
798
799    /// Convenience constructor for the default provider with custom group config.
800    ///
801    /// Creates a User with MLS service and the given consensus service.
802    ///
803    /// # Arguments
804    /// * `private_key` - Ethereum private key as hex string
805    /// * `consensus_service` - The default consensus service
806    /// * `handler` - Event handler for output events
807    /// * `state_handler` - Handler for state machine state changes
808    /// * `default_group_config` - Default config applied to new groups
809    pub fn with_private_key_and_config(
810        private_key: &str,
811        consensus_service: Arc<DefaultConsensusService>,
812        handler: Arc<H>,
813        state_handler: Arc<SCH>,
814        default_group_config: GroupConfig,
815    ) -> Result<Self, UserError> {
816        let signer = PrivateKeySigner::from_str(private_key)?;
817        let user_address = signer.address();
818
819        let mls_service = MlsService::new(MemoryDeMlsStorage::new());
820        mls_service
821            .init(user_address)
822            .map_err(|e| UserError::Core(e.into()))?;
823
824        Ok(Self::new_with_config(
825            mls_service,
826            consensus_service,
827            signer,
828            handler,
829            state_handler,
830            default_group_config,
831        ))
832    }
833}
834
835// ─────────────────────────── Errors ───────────────────────────
836
837/// Errors from User operations.
838#[derive(Debug, thiserror::Error)]
839pub enum UserError {
840    #[error("Group already exists")]
841    GroupAlreadyExists,
842
843    #[error("Group not found")]
844    GroupNotFound,
845
846    #[error("Already leaving this group")]
847    AlreadyLeaving,
848
849    #[error("Cannot send message: group is in {0} state")]
850    GroupBlocked(String),
851
852    #[error("Core error: {0}")]
853    Core(#[from] CoreError),
854
855    #[error("State machine error: {0}")]
856    StateMachine(#[from] super::state_machine::StateMachineError),
857
858    #[error("Consensus error: {0}")]
859    Consensus(#[from] hashgraph_like_consensus::error::ConsensusError),
860
861    #[error("Message error: {0}")]
862    Message(#[from] prost::DecodeError),
863
864    #[error("System time error: {0}")]
865    SystemTime(#[from] std::time::SystemTimeError),
866
867    #[error("Signer error: {0}")]
868    Signer(#[from] LocalSignerError),
869
870    #[error("Identity error: {0}")]
871    Identity(#[from] IdentityError),
872}
873
874impl UserError {
875    /// Returns `true` if this error is fatal and the operation should not be retried.
876    ///
877    /// Fatal errors indicate the group no longer exists or is in an unrecoverable state.
878    /// Non-fatal errors (network issues, temporary failures) can be retried.
879    pub fn is_fatal(&self) -> bool {
880        matches!(self, UserError::GroupNotFound | UserError::AlreadyLeaving)
881    }
882}