hashgraph_like_consensus/service_consensus.rs
1use alloy_signer::Signer;
2
3use crate::{
4 error::ConsensusError,
5 events::ConsensusEventBus,
6 protos::consensus::v1::{Proposal, Vote},
7 scope::ConsensusScope,
8 service::ConsensusService,
9 session::{ConsensusConfig, ConsensusSession},
10 storage::ConsensusStorage,
11 types::CreateProposalRequest,
12 utils::{build_vote, validate_proposal_timestamp, validate_vote},
13};
14
15impl<Scope, S, E> ConsensusService<Scope, S, E>
16where
17 Scope: ConsensusScope,
18 S: ConsensusStorage<Scope>,
19 E: ConsensusEventBus<Scope>,
20{
21 /// Create a new proposal and start the voting process.
22 ///
23 /// This creates the proposal, sets up a session to track votes, and schedules automatic
24 /// timeout handling. The proposal will expire after the time specified in the request.
25 ///
26 /// Configuration is resolved from: proposal config > scope config > global default.
27 /// If no config is provided, the scope's default configuration is used.
28 ///
29 /// # Examples
30 ///
31 /// ```rust
32 /// use hashgraph_like_consensus::{scope::ScopeID, scope_config::NetworkType,
33 /// service::DefaultConsensusService, types::CreateProposalRequest};
34 ///
35 /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
36 /// let service = DefaultConsensusService::default();
37 /// let scope = ScopeID::from("my_scope");
38 ///
39 /// service
40 /// .scope(&scope)
41 /// .await?
42 /// .with_network_type(NetworkType::P2P)
43 /// .with_threshold(0.75)
44 /// .initialize()
45 /// .await?;
46 ///
47 /// let request = CreateProposalRequest::new(
48 /// "Test Proposal".to_string(),
49 /// "payload".to_string(),
50 /// vec![0u8; 20],
51 /// 3,
52 /// 100,
53 /// true,
54 /// )?;
55 /// let proposal = service.create_proposal(&scope, request).await?;
56 /// Ok(())
57 /// }
58 /// ```
59 pub async fn create_proposal(
60 &self,
61 scope: &Scope,
62 request: CreateProposalRequest,
63 ) -> Result<Proposal, ConsensusError> {
64 self.create_proposal_with_config(scope, request, None).await
65 }
66
67 /// Create a new proposal with explicit configuration override.
68 ///
69 /// This allows you to override the scope's default configuration for a specific proposal.
70 /// The override takes precedence over scope config.
71 ///
72 /// # Examples
73 ///
74 /// ```rust
75 /// use hashgraph_like_consensus::{scope::ScopeID,
76 /// service::DefaultConsensusService, session::ConsensusConfig, types::CreateProposalRequest};
77 ///
78 /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
79 /// let service = DefaultConsensusService::default();
80 /// let scope = ScopeID::from("my_scope");
81 /// let request = CreateProposalRequest::new(
82 /// "Test Proposal".to_string(),
83 /// "payload".to_string(),
84 /// vec![0u8; 20],
85 /// 3,
86 /// 100,
87 /// true,
88 /// )?;
89 ///
90 /// let proposal = service.create_proposal_with_config(
91 /// &scope,
92 /// request,
93 /// Some(ConsensusConfig::p2p())
94 /// ).await?;
95 ///
96 /// let request2 = CreateProposalRequest::new(
97 /// "Another Proposal".to_string(),
98 /// "payload2".to_string(),
99 /// vec![0u8; 20],
100 /// 3,
101 /// 100,
102 /// true,
103 /// )?;
104 /// let proposal2 = service.create_proposal_with_config(
105 /// &scope,
106 /// request2,
107 /// None
108 /// ).await?;
109 /// Ok(())
110 /// }
111 /// ```
112 pub async fn create_proposal_with_config(
113 &self,
114 scope: &Scope,
115 request: CreateProposalRequest,
116 config: Option<ConsensusConfig>,
117 ) -> Result<Proposal, ConsensusError> {
118 let proposal = request.into_proposal()?;
119
120 // Resolve config: override > scope config > global default, aligning timeout with proposal
121 let config = self.resolve_config(scope, config, Some(&proposal)).await?;
122
123 let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
124 self.save_session(scope, session).await?;
125 self.trim_scope_sessions(scope).await?;
126
127 Ok(proposal)
128 }
129
130 /// Cast your vote on a proposal (yes or no).
131 ///
132 /// Vote is cryptographically signed and linked to previous votes in the hashgraph.
133 /// Returns the signed vote, which you can then send to other peers in the network.
134 /// Each voter can only vote once per proposal.
135 pub async fn cast_vote<SN: Signer + Sync>(
136 &self,
137 scope: &Scope,
138 proposal_id: u32,
139 choice: bool,
140 signer: SN,
141 ) -> Result<Vote, ConsensusError> {
142 let session = self.get_session(scope, proposal_id).await?;
143
144 validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
145
146 let voter_address = signer.address().as_slice().to_vec();
147 if session.votes.contains_key(&voter_address) {
148 return Err(ConsensusError::UserAlreadyVoted);
149 }
150
151 let vote = build_vote(&session.proposal, choice, signer).await?;
152 let vote_clone = vote.clone();
153
154 let transition = self
155 .update_session(scope, proposal_id, move |session| {
156 session.add_vote(vote_clone)
157 })
158 .await?;
159
160 self.handle_transition(scope, proposal_id, transition);
161 Ok(vote)
162 }
163
164 /// Cast a vote and immediately get back the updated proposal.
165 ///
166 /// This is a convenience method that combines `cast_vote` and fetching the proposal.
167 /// Useful for proposal creator as they can immediately see the proposal with their vote
168 /// and share it with other peers.
169 pub async fn cast_vote_and_get_proposal<SN: Signer + Sync>(
170 &self,
171 scope: &Scope,
172 proposal_id: u32,
173 choice: bool,
174 signer: SN,
175 ) -> Result<Proposal, ConsensusError> {
176 self.cast_vote(scope, proposal_id, choice, signer).await?;
177 let session = self.get_session(scope, proposal_id).await?;
178 Ok(session.proposal)
179 }
180
181 /// Process a proposal you received from another peer in the network.
182 ///
183 /// This validates the proposal and all its votes (signatures, vote chains, timestamps),
184 /// then stores it locally.
185 /// If it necessary the consensus configuration is resolved from the proposal.
186 /// If the proposal already has enough votes, consensus is reached
187 /// immediately and an event is emitted.
188 pub async fn process_incoming_proposal(
189 &self,
190 scope: &Scope,
191 proposal: Proposal,
192 ) -> Result<(), ConsensusError> {
193 if self.get_session(scope, proposal.proposal_id).await.is_ok() {
194 return Err(ConsensusError::ProposalAlreadyExist);
195 }
196
197 let config = self.resolve_config(scope, None, Some(&proposal)).await?;
198 let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
199 self.handle_transition(scope, session.proposal.proposal_id, transition);
200
201 self.save_session(scope, session).await?;
202 self.trim_scope_sessions(scope).await?;
203 Ok(())
204 }
205
206 /// Process a vote you received from another peer.
207 ///
208 /// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
209 /// If this vote brings the total to the consensus threshold, consensus is reached and
210 /// an event is emitted.
211 pub async fn process_incoming_vote(
212 &self,
213 scope: &Scope,
214 vote: Vote,
215 ) -> Result<(), ConsensusError> {
216 let session = self.get_session(scope, vote.proposal_id).await?;
217 validate_vote(
218 &vote,
219 session.proposal.expiration_timestamp,
220 session.proposal.timestamp,
221 )?;
222 let proposal_id = vote.proposal_id;
223 let transition = self
224 .update_session(scope, proposal_id, move |session| session.add_vote(vote))
225 .await?;
226
227 self.handle_transition(scope, proposal_id, transition);
228 Ok(())
229 }
230}