hashgraph_like_consensus/service_consensus.rs
1//! Implementation of [`ConsensusServiceAPI`] for [`ConsensusService`].
2
3use alloy_signer::Signer;
4
5use crate::{
6 api::ConsensusServiceAPI,
7 error::ConsensusError,
8 events::ConsensusEventBus,
9 protos::consensus::v1::{Proposal, Vote},
10 scope::ConsensusScope,
11 service::ConsensusService,
12 session::{ConsensusConfig, ConsensusSession},
13 storage::ConsensusStorage,
14 types::CreateProposalRequest,
15 utils::{build_vote, validate_proposal_timestamp, validate_vote},
16};
17
18impl<Scope, S, E> ConsensusServiceAPI<Scope, S, E> for ConsensusService<Scope, S, E>
19where
20 Scope: ConsensusScope,
21 S: ConsensusStorage<Scope>,
22 E: ConsensusEventBus<Scope>,
23{
24 /// Create a new proposal and start the voting process.
25 ///
26 /// This creates the proposal, sets up a session to track votes, and schedules automatic
27 /// timeout handling. The proposal will expire after the time specified in the request.
28 ///
29 /// Configuration is resolved from: proposal config > scope config > global default.
30 /// If no config is provided, the scope's default configuration is used.
31 ///
32 /// # Examples
33 ///
34 /// ```rust
35 /// use hashgraph_like_consensus::{api::ConsensusServiceAPI, scope::ScopeID,
36 /// scope_config::NetworkType, service::DefaultConsensusService, types::CreateProposalRequest};
37 ///
38 /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
39 /// let service = DefaultConsensusService::default();
40 /// let scope = ScopeID::from("my_scope");
41 ///
42 /// service
43 /// .scope(&scope)
44 /// .await?
45 /// .with_network_type(NetworkType::P2P)
46 /// .with_threshold(0.75)
47 /// .initialize()
48 /// .await?;
49 ///
50 /// let request = CreateProposalRequest::new(
51 /// "Test Proposal".to_string(),
52 /// b"payload".to_vec(),
53 /// vec![0u8; 20],
54 /// 3,
55 /// 100,
56 /// true,
57 /// )?;
58 /// let proposal = service.create_proposal(&scope, request).await?;
59 /// Ok(())
60 /// }
61 /// ```
62 async fn create_proposal(
63 &self,
64 scope: &Scope,
65 request: CreateProposalRequest,
66 ) -> Result<Proposal, ConsensusError> {
67 self.create_proposal_with_config(scope, request, None).await
68 }
69
70 /// Create a new proposal with explicit configuration override.
71 ///
72 /// This allows you to override the scope's default configuration for a specific proposal.
73 /// The override takes precedence over scope config.
74 ///
75 /// # Examples
76 ///
77 /// ```rust
78 /// use hashgraph_like_consensus::{api::ConsensusServiceAPI, scope::ScopeID,
79 /// service::DefaultConsensusService, session::ConsensusConfig, types::CreateProposalRequest};
80 ///
81 /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
82 /// let service = DefaultConsensusService::default();
83 /// let scope = ScopeID::from("my_scope");
84 /// let request = CreateProposalRequest::new(
85 /// "Test Proposal".to_string(),
86 /// b"payload".to_vec(),
87 /// vec![0u8; 20],
88 /// 3,
89 /// 100,
90 /// true,
91 /// )?;
92 ///
93 /// let proposal = service.create_proposal_with_config(
94 /// &scope,
95 /// request,
96 /// Some(ConsensusConfig::p2p())
97 /// ).await?;
98 ///
99 /// let request2 = CreateProposalRequest::new(
100 /// "Another Proposal".to_string(),
101 /// b"payload2".to_vec(),
102 /// vec![0u8; 20],
103 /// 3,
104 /// 100,
105 /// true,
106 /// )?;
107 /// let proposal2 = service.create_proposal_with_config(
108 /// &scope,
109 /// request2,
110 /// None
111 /// ).await?;
112 /// Ok(())
113 /// }
114 /// ```
115 async fn create_proposal_with_config(
116 &self,
117 scope: &Scope,
118 request: CreateProposalRequest,
119 config: Option<ConsensusConfig>,
120 ) -> Result<Proposal, ConsensusError> {
121 let proposal = request.into_proposal()?;
122
123 // Resolve config: override > scope config > global default, aligning timeout with proposal
124 let config = self.resolve_config(scope, config, Some(&proposal)).await?;
125
126 let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
127 self.save_session(scope, session).await?;
128 self.trim_scope_sessions(scope).await?;
129
130 Ok(proposal)
131 }
132
133 /// Cast your vote on a proposal (yes or no).
134 ///
135 /// Vote is cryptographically signed and linked to previous votes in the hashgraph.
136 /// Returns the signed vote, which you can then send to other peers in the network.
137 /// Each voter can only vote once per proposal.
138 async fn cast_vote<SN: Signer + Sync + Send>(
139 &self,
140 scope: &Scope,
141 proposal_id: u32,
142 choice: bool,
143 signer: SN,
144 ) -> Result<Vote, ConsensusError> {
145 let session = self.get_session(scope, proposal_id).await?;
146
147 validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
148
149 let voter_address = signer.address().as_slice().to_vec();
150 if session.votes.contains_key(&voter_address) {
151 return Err(ConsensusError::UserAlreadyVoted);
152 }
153
154 let vote = build_vote(&session.proposal, choice, signer).await?;
155 let vote_clone = vote.clone();
156
157 let transition = self
158 .update_session(scope, proposal_id, move |session| {
159 session.add_vote(vote_clone)
160 })
161 .await?;
162
163 self.handle_transition(scope, proposal_id, transition);
164 Ok(vote)
165 }
166
167 /// Cast a vote and immediately get back the updated proposal.
168 ///
169 /// This is a convenience method that combines `cast_vote` and fetching the proposal.
170 /// Useful for proposal creator as they can immediately see the proposal with their vote
171 /// and share it with other peers.
172 async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
173 &self,
174 scope: &Scope,
175 proposal_id: u32,
176 choice: bool,
177 signer: SN,
178 ) -> Result<Proposal, ConsensusError> {
179 self.cast_vote(scope, proposal_id, choice, signer).await?;
180 let session = self.get_session(scope, proposal_id).await?;
181 Ok(session.proposal)
182 }
183
184 /// Process a proposal you received from another peer in the network.
185 ///
186 /// This validates the proposal and all its votes (signatures, vote chains, timestamps),
187 /// then stores it locally.
188 /// If it necessary the consensus configuration is resolved from the proposal.
189 /// If the proposal already has enough votes, consensus is reached
190 /// immediately and an event is emitted.
191 async fn process_incoming_proposal(
192 &self,
193 scope: &Scope,
194 proposal: Proposal,
195 ) -> Result<(), ConsensusError> {
196 if self.get_session(scope, proposal.proposal_id).await.is_ok() {
197 return Err(ConsensusError::ProposalAlreadyExist);
198 }
199
200 let config = self.resolve_config(scope, None, Some(&proposal)).await?;
201 let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
202 self.handle_transition(scope, session.proposal.proposal_id, transition);
203
204 self.save_session(scope, session).await?;
205 self.trim_scope_sessions(scope).await?;
206 Ok(())
207 }
208
209 /// Process a vote you received from another peer.
210 ///
211 /// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
212 /// If this vote brings the total to the consensus threshold, consensus is reached and
213 /// an event is emitted.
214 async fn process_incoming_vote(&self, scope: &Scope, vote: Vote) -> Result<(), ConsensusError> {
215 let session = self.get_session(scope, vote.proposal_id).await?;
216 validate_vote(
217 &vote,
218 session.proposal.expiration_timestamp,
219 session.proposal.timestamp,
220 )?;
221 let proposal_id = vote.proposal_id;
222 let transition = self
223 .update_session(scope, proposal_id, move |session| session.add_vote(vote))
224 .await?;
225
226 self.handle_transition(scope, proposal_id, transition);
227 Ok(())
228 }
229
230 async fn get_proposal(
231 &self,
232 scope: &Scope,
233 proposal_id: u32,
234 ) -> Result<Proposal, ConsensusError> {
235 let session = self.get_session(scope, proposal_id).await?;
236 Ok(session.proposal)
237 }
238
239 async fn get_proposal_payload(
240 &self,
241 scope: &Scope,
242 proposal_id: u32,
243 ) -> Result<Vec<u8>, ConsensusError> {
244 let session = self.get_session(scope, proposal_id).await?;
245 Ok(session.proposal.payload)
246 }
247}