1use alloy::signers::local::{LocalSignerError, PrivateKeySigner};
7use std::{collections::HashMap, str::FromStr, sync::Arc};
8use tokio::sync::RwLock;
9use tracing::{error, info};
10
11use hashgraph_like_consensus::{service::DefaultConsensusService, types::ConsensusEvent};
12
13use crate::app::state_machine::{
14 CommitTimeoutStatus, GroupConfig, GroupState, GroupStateMachine, StateChangeHandler,
15};
16use crate::core::{
17 self, CoreError, DeMlsProvider, DefaultProvider, GroupEventHandler, GroupHandle,
18 create_batch_proposals,
19};
20use crate::ds::InboundPacket;
21use crate::mls_crypto::{
22 IdentityError, MemoryDeMlsStorage, MlsService, format_wallet_address, parse_wallet_to_bytes,
23};
24use crate::protos::de_mls::messages::v1::{
25 AppMessage, BanRequest, ConversationMessage, GroupUpdateRequest, RemoveMember,
26 group_update_request,
27};
28
29struct GroupEntry {
31 handle: GroupHandle,
32 state_machine: GroupStateMachine,
33}
34
35pub struct User<P: DeMlsProvider, H: GroupEventHandler, SCH: StateChangeHandler> {
49 mls_service: MlsService<P::Storage>,
50 groups: Arc<RwLock<HashMap<String, GroupEntry>>>,
51 consensus_service: Arc<P::Consensus>,
52 eth_signer: PrivateKeySigner,
53 handler: Arc<H>,
54 state_handler: Arc<SCH>,
55 default_group_config: GroupConfig,
57}
58
59impl<P: DeMlsProvider, H: GroupEventHandler + 'static, SCH: StateChangeHandler + 'static>
60 User<P, H, SCH>
61{
62 fn new_with_config(
72 mls_service: MlsService<P::Storage>,
73 consensus_service: Arc<P::Consensus>,
74 eth_signer: PrivateKeySigner,
75 handler: Arc<H>,
76 state_handler: Arc<SCH>,
77 default_group_config: GroupConfig,
78 ) -> Self {
79 Self {
80 mls_service,
81 groups: Arc::new(RwLock::new(HashMap::new())),
82 consensus_service,
83 eth_signer,
84 handler,
85 state_handler,
86 default_group_config,
87 }
88 }
89
90 pub fn identity_string(&self) -> String {
92 self.mls_service.wallet_hex()
93 }
94
95 pub async fn create_group(
103 &mut self,
104 group_name: &str,
105 is_creation: bool,
106 ) -> Result<(), UserError> {
107 self.create_group_with_config(group_name, is_creation, self.default_group_config.clone())
108 .await
109 }
110
111 pub async fn create_group_with_config(
118 &mut self,
119 group_name: &str,
120 is_creation: bool,
121 config: GroupConfig,
122 ) -> Result<(), UserError> {
123 let mut groups = self.groups.write().await;
124 if groups.contains_key(group_name) {
125 return Err(UserError::GroupAlreadyExists);
126 }
127
128 let (handle, state_machine) = if is_creation {
129 let handle = core::create_group(group_name, &self.mls_service)?;
130 let state_machine = GroupStateMachine::new_as_steward_with_config(config);
131 (handle, state_machine)
132 } else {
133 let handle = core::prepare_to_join(group_name);
134 let state_machine = GroupStateMachine::new_as_pending_join_with_config(config);
135 (handle, state_machine)
136 };
137
138 let initial_state = state_machine.current_state();
139 groups.insert(
140 group_name.to_string(),
141 GroupEntry {
142 handle,
143 state_machine,
144 },
145 );
146 drop(groups);
147
148 self.state_handler
149 .on_state_changed(group_name, initial_state)
150 .await;
151
152 Ok(())
153 }
154
155 pub async fn leave_group(&mut self, group_name: &str) -> Result<(), UserError> {
163 info!("[leave_group]: Leaving group {group_name}");
164
165 let (old_state, new_state) = {
166 let mut groups = self.groups.write().await;
167 let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
168 let old_state = entry.state_machine.current_state();
169 match old_state {
170 GroupState::PendingJoin => {
171 groups.remove(group_name);
173 drop(groups);
174 self.handler.on_leave_group(group_name).await?;
175 return Ok(());
176 }
177 GroupState::Leaving => return Err(UserError::AlreadyLeaving),
178 _ => {
179 entry.state_machine.start_leaving();
180 }
181 }
182 (old_state, entry.state_machine.current_state())
183 };
184
185 self.state_handler
187 .on_state_changed(group_name, new_state.clone())
188 .await;
189
190 info!(
191 "[leave_group]: Transitioning from {old_state} to Leaving, sending self-removal for group {group_name}"
192 );
193
194 self.start_voting_on_request_background(
197 group_name.to_string(),
198 GroupUpdateRequest {
199 payload: Some(group_update_request::Payload::RemoveMember(RemoveMember {
200 identity: parse_wallet_to_bytes(&self.identity_string())?,
201 })),
202 },
203 )
204 .await?;
205 Ok(())
206 }
207
208 pub async fn get_group_state(&self, group_name: &str) -> Result<GroupState, UserError> {
210 let groups = self.groups.read().await;
211 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
212 Ok(entry.state_machine.current_state())
213 }
214
215 pub async fn list_groups(&self) -> Vec<String> {
217 let groups = self.groups.read().await;
218 groups.keys().cloned().collect()
219 }
220
221 pub async fn is_steward_for_group(&self, group_name: &str) -> Result<bool, UserError> {
223 let groups = self.groups.read().await;
224 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
225 Ok(entry.state_machine.is_steward())
226 }
227
228 pub async fn get_group_members(&self, group_name: &str) -> Result<Vec<String>, UserError> {
230 let groups = self.groups.read().await;
231 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
232
233 if !entry.handle.is_mls_initialized() {
234 return Ok(Vec::new());
235 }
236
237 let members = core::group_members(&entry.handle, &self.mls_service)?;
238 Ok(members
239 .into_iter()
240 .map(|raw| format_wallet_address(raw.as_slice()).to_string())
241 .collect())
242 }
243
244 pub async fn get_approved_proposal_for_current_epoch(
246 &self,
247 group_name: &str,
248 ) -> Result<Vec<GroupUpdateRequest>, UserError> {
249 let groups = self.groups.read().await;
250 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
251 let approved_proposals = core::approved_proposals(&entry.handle);
252 let display_proposals: Vec<GroupUpdateRequest> = approved_proposals.into_values().collect();
253 Ok(display_proposals)
254 }
255
256 pub async fn get_epoch_history(
260 &self,
261 group_name: &str,
262 ) -> Result<Vec<Vec<GroupUpdateRequest>>, UserError> {
263 let groups = self.groups.read().await;
264 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
265 let history = core::epoch_history(&entry.handle);
266 Ok(history
267 .iter()
268 .map(|batch| batch.values().cloned().collect())
269 .collect())
270 }
271
272 pub async fn send_kp_message(&self, group_name: &str) -> Result<(), UserError> {
276 let groups = self.groups.read().await;
277 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
278 let packet = core::build_key_package_message(&entry.handle, &self.mls_service)?;
279 self.handler.on_outbound(group_name, packet).await?;
280 Ok(())
281 }
282
283 pub async fn send_app_message(
288 &self,
289 group_name: &str,
290 message: Vec<u8>,
291 ) -> Result<(), UserError> {
292 let groups = self.groups.read().await;
293 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
294
295 let state = entry.state_machine.current_state();
297 if state == GroupState::PendingJoin || state == GroupState::Waiting {
298 return Err(UserError::GroupBlocked(state.to_string()));
299 }
300
301 let app_msg: AppMessage = ConversationMessage {
302 message,
303 sender: self.identity_string(),
304 group_name: group_name.to_string(),
305 }
306 .into();
307
308 let packet = core::build_message(&entry.handle, &self.mls_service, &app_msg)?;
309 self.handler.on_outbound(group_name, packet).await?;
310 Ok(())
311 }
312
313 pub async fn process_ban_request(
317 &mut self,
318 ban_request: BanRequest,
319 group_name: &str,
320 ) -> Result<(), UserError> {
321 {
323 let groups = self.groups.read().await;
324 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
325 let state = entry.state_machine.current_state();
326 if state != GroupState::Working {
327 return Err(UserError::GroupBlocked(state.to_string()));
328 }
329 }
330
331 self.start_voting_on_request_background(
332 group_name.to_string(),
333 GroupUpdateRequest {
334 payload: Some(group_update_request::Payload::RemoveMember(RemoveMember {
335 identity: parse_wallet_to_bytes(ban_request.user_to_ban.as_str())?,
336 })),
337 },
338 )
339 .await?;
340
341 Ok(())
342 }
343
344 pub async fn check_pending_join(&self, group_name: &str) -> bool {
355 let (state, expired) = {
357 let groups = self.groups.read().await;
358 match groups.get(group_name) {
359 Some(entry) => (
360 entry.state_machine.current_state(),
361 entry.state_machine.is_pending_join_expired(),
362 ),
363 None => return false, }
365 };
366
367 if state != GroupState::PendingJoin {
369 return false;
370 }
371
372 if expired {
374 info!(
375 "[check_pending_join]: Join timed out for group {group_name} \
376 (time-based fallback)"
377 );
378 self.groups.write().await.remove(group_name);
379 let _ = self.handler.on_leave_group(group_name).await;
380 return false;
381 }
382
383 true }
385
386 pub async fn time_until_next_epoch(&self, group_name: &str) -> Option<std::time::Duration> {
391 let groups = self.groups.read().await;
392 let entry = groups.get(group_name)?;
393 entry.state_machine.time_until_next_boundary()
394 }
395
396 pub async fn check_commit_timeout(&self, group_name: &str) -> CommitTimeoutStatus {
410 let has_proposals = {
411 let mut groups = self.groups.write().await;
412 let entry = match groups.get_mut(group_name) {
413 Some(e) => e,
414 None => return CommitTimeoutStatus::NotWaiting,
415 };
416
417 if entry.state_machine.current_state() != GroupState::Waiting {
418 return CommitTimeoutStatus::NotWaiting;
419 }
420 if !entry.state_machine.is_commit_timed_out() {
421 return CommitTimeoutStatus::StillWaiting;
422 }
423
424 let has_proposals = core::approved_proposals_count(&entry.handle) > 0;
425
426 if has_proposals {
427 if let Err(e) = core::request_steward_reelection::<P>(
430 &mut entry.handle,
431 group_name,
432 &*self.consensus_service,
433 &*self.handler,
434 )
435 .await
436 {
437 error!("[check_commit_timeout] Failed to request steward re-election: {e}");
438 }
439 }
440
441 entry.state_machine.sync_epoch_boundary();
442 entry.state_machine.start_working();
443 has_proposals
444 };
445
446 self.state_handler
447 .on_state_changed(group_name, GroupState::Working)
448 .await;
449
450 CommitTimeoutStatus::TimedOut { has_proposals }
451 }
452
453 pub async fn start_member_epoch(&self, group_name: &str) -> Result<bool, UserError> {
463 let mut groups = self.groups.write().await;
464 let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
465
466 if entry.state_machine.is_steward() {
468 return Ok(false);
469 }
470
471 let state = entry.state_machine.current_state();
473 if state == GroupState::PendingJoin || state == GroupState::Leaving {
474 return Ok(false);
475 }
476
477 let proposal_count = core::approved_proposals_count(&entry.handle);
479 let entered_waiting = entry.state_machine.check_epoch_boundary(proposal_count);
480
481 if entered_waiting {
482 let new_state = entry.state_machine.current_state();
483 info!(
484 "[start_member_epoch]: Entered Waiting at epoch boundary for group {group_name} \
485 ({proposal_count} approved proposals)"
486 );
487 drop(groups);
488 self.state_handler
489 .on_state_changed(group_name, new_state.clone())
490 .await;
491 }
492
493 Ok(entered_waiting)
494 }
495
496 pub async fn start_steward_epoch(&mut self, group_name: &str) -> Result<(), UserError> {
498 let has_proposals = {
500 let groups = self.groups.read().await;
501 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
502 core::approved_proposals_count(&entry.handle) > 0
503 };
504
505 if !has_proposals {
506 return Ok(());
507 }
508
509 {
511 let mut groups = self.groups.write().await;
512 let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
513 entry.state_machine.start_steward_epoch()?;
514 }
515 self.state_handler
516 .on_state_changed(group_name, GroupState::Waiting)
517 .await;
518
519 let messages = {
521 let mut groups = self.groups.write().await;
522 let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
523 create_batch_proposals(&mut entry.handle, &self.mls_service)?
524 };
525
526 for message in messages {
528 self.handler.on_outbound(group_name, message).await?;
529 }
530
531 {
533 let mut groups = self.groups.write().await;
534 if let Some(entry) = groups.get_mut(group_name) {
535 entry.state_machine.start_working();
536 }
537 }
538 self.state_handler
539 .on_state_changed(group_name, GroupState::Working)
540 .await;
541
542 Ok(())
543 }
544
545 pub async fn start_voting_on_request_background(
546 &self,
547 group_name: String,
548 upd_request: GroupUpdateRequest,
549 ) -> Result<(), UserError> {
550 let expected_voters = {
551 let groups = self.groups.read().await;
552 let entry = groups.get(&group_name).ok_or(UserError::GroupNotFound)?;
553 let members = core::group_members(&entry.handle, &self.mls_service)?;
554 members.len() as u32
555 };
556 let identity_string = self.mls_service.wallet_hex();
557
558 let consensus = Arc::clone(&self.consensus_service);
559 let groups = Arc::clone(&self.groups);
560 let handler = Arc::clone(&self.handler);
561 let group_name_clone = group_name.clone();
562
563 tokio::spawn(async move {
564 let result: Result<(), CoreError> = async {
565 let proposal_id = core::start_voting::<P>(
566 &group_name,
567 &upd_request,
568 expected_voters,
569 identity_string,
570 &*consensus,
571 &*handler,
572 )
573 .await?;
574
575 {
576 let mut groups = groups.write().await;
577 if let Some(entry) = groups.get_mut(&group_name) {
578 entry
579 .handle
580 .store_voting_proposal(proposal_id, upd_request);
581 } else {
582 error!(
583 "[start_voting_on_request]: Group {group_name} missing during proposal store"
584 );
585 }
586 }
587
588 info!(
589 "[start_voting_on_request]: Stored voting proposal: {proposal_id}"
590 );
591
592 Ok(())
593 }
594 .await;
595
596 if let Err(err) = result {
597 error!("[start_voting_on_request]: background task failed: {err}");
598 handler
599 .on_error(&group_name_clone, "Start voting", &err.to_string())
600 .await;
601 }
602 });
603
604 Ok(())
605 }
606
607 pub async fn process_user_vote(
614 &mut self,
615 group_name: &str,
616 proposal_id: u32,
617 vote: bool,
618 ) -> Result<(), UserError> {
619 let groups = self.groups.read().await;
620 let entry = groups.get(group_name).ok_or(UserError::GroupNotFound)?;
621
622 let state = entry.state_machine.current_state();
624 if state == GroupState::Waiting {
625 return Err(UserError::GroupBlocked(state.to_string()));
626 }
627
628 let handle = entry.handle.clone();
629 drop(groups);
630
631 core::cast_vote::<P, _, _>(
632 &handle,
633 group_name,
634 proposal_id,
635 vote,
636 &*self.consensus_service,
637 self.eth_signer.clone(),
638 &self.mls_service,
639 &*self.handler,
640 )
641 .await?;
642 Ok(())
643 }
644
645 pub async fn process_inbound_packet(&self, packet: InboundPacket) -> Result<(), UserError> {
649 let group_name = packet.group_id.clone();
650
651 {
653 let groups = self.groups.read().await;
654 if let Some(entry) = groups.get(&group_name) {
655 if packet.app_id == entry.handle.app_id() {
656 return Ok(());
657 }
658 } else {
659 return Err(UserError::GroupNotFound);
660 }
661 }
662
663 let (result, handle) = {
665 let mut groups = self.groups.write().await;
666 let entry = groups
667 .get_mut(&group_name)
668 .ok_or(UserError::GroupNotFound)?;
669
670 let result = core::process_inbound(
671 &mut entry.handle,
672 &packet.payload,
673 &packet.subtopic,
674 &self.mls_service,
675 )?;
676
677 (result, entry.handle.clone())
678 };
679
680 let action = core::dispatch_result::<P, _>(
681 &handle,
682 &group_name,
683 result,
684 &*self.consensus_service,
685 &*self.handler,
686 &self.mls_service,
687 )
688 .await?;
689
690 match action {
691 core::DispatchAction::StartVoting(request) => {
692 self.start_voting_on_request_background(group_name, request)
693 .await?;
694 }
695 core::DispatchAction::GroupUpdated => {
696 let transitioned = {
699 let mut groups = self.groups.write().await;
700 if let Some(entry) = groups.get_mut(&group_name) {
701 let state = entry.state_machine.current_state();
702 if state == GroupState::Working || state == GroupState::Waiting {
703 entry.state_machine.sync_epoch_boundary();
704 entry.state_machine.start_working();
705 true
706 } else {
707 false
708 }
709 } else {
710 false
711 }
712 };
713
714 if transitioned {
715 self.state_handler
716 .on_state_changed(&group_name, GroupState::Working)
717 .await;
718 }
719 }
720 core::DispatchAction::LeaveGroup => {
721 self.groups.write().await.remove(&group_name);
722 self.handler.on_leave_group(&group_name).await?;
723 }
724 core::DispatchAction::JoinedGroup => {
725 let state = {
727 let mut groups = self.groups.write().await;
728 if let Some(entry) = groups.get_mut(&group_name) {
729 entry.state_machine.sync_epoch_boundary();
730 entry.state_machine.start_working();
731 Some(entry.state_machine.current_state())
732 } else {
733 None
734 }
735 };
736 if let Some(state) = state {
737 self.state_handler
738 .on_state_changed(&group_name, state)
739 .await;
740 }
741 }
742 core::DispatchAction::Done => {}
743 }
744 Ok(())
745 }
746
747 pub async fn handle_consensus_event(
751 &mut self,
752 group_name: &str,
753 event: ConsensusEvent,
754 ) -> Result<(), UserError> {
755 let mut groups = self.groups.write().await;
756 let entry = groups.get_mut(group_name).ok_or(UserError::GroupNotFound)?;
757
758 core::handle_consensus_event::<P>(
759 &mut entry.handle,
760 group_name,
761 event,
762 &*self.consensus_service,
763 )
764 .await?;
765
766 Ok(())
767 }
768}
769
770impl<H: GroupEventHandler + 'static, SCH: StateChangeHandler + 'static>
773 User<DefaultProvider, H, SCH>
774{
775 pub fn with_private_key(
785 private_key: &str,
786 consensus_service: Arc<DefaultConsensusService>,
787 handler: Arc<H>,
788 state_handler: Arc<SCH>,
789 ) -> Result<Self, UserError> {
790 Self::with_private_key_and_config(
791 private_key,
792 consensus_service,
793 handler,
794 state_handler,
795 GroupConfig::default(),
796 )
797 }
798
799 pub fn with_private_key_and_config(
810 private_key: &str,
811 consensus_service: Arc<DefaultConsensusService>,
812 handler: Arc<H>,
813 state_handler: Arc<SCH>,
814 default_group_config: GroupConfig,
815 ) -> Result<Self, UserError> {
816 let signer = PrivateKeySigner::from_str(private_key)?;
817 let user_address = signer.address();
818
819 let mls_service = MlsService::new(MemoryDeMlsStorage::new());
820 mls_service
821 .init(user_address)
822 .map_err(|e| UserError::Core(e.into()))?;
823
824 Ok(Self::new_with_config(
825 mls_service,
826 consensus_service,
827 signer,
828 handler,
829 state_handler,
830 default_group_config,
831 ))
832 }
833}
834
835#[derive(Debug, thiserror::Error)]
839pub enum UserError {
840 #[error("Group already exists")]
841 GroupAlreadyExists,
842
843 #[error("Group not found")]
844 GroupNotFound,
845
846 #[error("Already leaving this group")]
847 AlreadyLeaving,
848
849 #[error("Cannot send message: group is in {0} state")]
850 GroupBlocked(String),
851
852 #[error("Core error: {0}")]
853 Core(#[from] CoreError),
854
855 #[error("State machine error: {0}")]
856 StateMachine(#[from] super::state_machine::StateMachineError),
857
858 #[error("Consensus error: {0}")]
859 Consensus(#[from] hashgraph_like_consensus::error::ConsensusError),
860
861 #[error("Message error: {0}")]
862 Message(#[from] prost::DecodeError),
863
864 #[error("System time error: {0}")]
865 SystemTime(#[from] std::time::SystemTimeError),
866
867 #[error("Signer error: {0}")]
868 Signer(#[from] LocalSignerError),
869
870 #[error("Identity error: {0}")]
871 Identity(#[from] IdentityError),
872}
873
874impl UserError {
875 pub fn is_fatal(&self) -> bool {
880 matches!(self, UserError::GroupNotFound | UserError::AlreadyLeaving)
881 }
882}