Skip to main content

hashgraph_like_consensus/
service_consensus.rs

1//! Implementation of [`ConsensusServiceAPI`] for [`ConsensusService`].
2
3use alloy_signer::Signer;
4
5use crate::{
6    api::ConsensusServiceAPI,
7    error::ConsensusError,
8    events::ConsensusEventBus,
9    protos::consensus::v1::{Proposal, Vote},
10    scope::ConsensusScope,
11    service::ConsensusService,
12    session::{ConsensusConfig, ConsensusSession},
13    storage::ConsensusStorage,
14    types::CreateProposalRequest,
15    utils::{build_vote, validate_proposal_timestamp, validate_vote},
16};
17
18impl<Scope, S, E> ConsensusServiceAPI<Scope, S, E> for ConsensusService<Scope, S, E>
19where
20    Scope: ConsensusScope,
21    S: ConsensusStorage<Scope>,
22    E: ConsensusEventBus<Scope>,
23{
24    /// Create a new proposal and start the voting process.
25    ///
26    /// This creates the proposal, sets up a session to track votes, and schedules automatic
27    /// timeout handling. The proposal will expire after the time specified in the request.
28    ///
29    /// Configuration is resolved from: proposal config > scope config > global default.
30    /// If no config is provided, the scope's default configuration is used.
31    ///
32    /// # Examples
33    ///
34    /// ```rust
35    /// use hashgraph_like_consensus::{api::ConsensusServiceAPI, scope::ScopeID,
36    /// scope_config::NetworkType, service::DefaultConsensusService, types::CreateProposalRequest};
37    ///
38    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
39    ///     let service = DefaultConsensusService::default();
40    ///     let scope = ScopeID::from("my_scope");
41    ///
42    ///     service
43    ///         .scope(&scope)
44    ///         .await?
45    ///         .with_network_type(NetworkType::P2P)
46    ///         .with_threshold(0.75)
47    ///         .initialize()
48    ///         .await?;
49    ///
50    ///     let request = CreateProposalRequest::new(
51    ///         "Test Proposal".to_string(),
52    ///         b"payload".to_vec(),
53    ///         vec![0u8; 20],
54    ///         3,
55    ///         100,
56    ///         true,
57    ///     )?;
58    ///     let proposal = service.create_proposal(&scope, request).await?;
59    ///     Ok(())
60    /// }
61    /// ```
62    async fn create_proposal(
63        &self,
64        scope: &Scope,
65        request: CreateProposalRequest,
66    ) -> Result<Proposal, ConsensusError> {
67        self.create_proposal_with_config(scope, request, None).await
68    }
69
70    /// Create a new proposal with explicit configuration override.
71    ///
72    /// This allows you to override the scope's default configuration for a specific proposal.
73    /// The override takes precedence over scope config.
74    ///
75    /// # Examples
76    ///
77    /// ```rust
78    /// use hashgraph_like_consensus::{api::ConsensusServiceAPI, scope::ScopeID,
79    /// service::DefaultConsensusService, session::ConsensusConfig, types::CreateProposalRequest};
80    ///
81    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
82    ///     let service = DefaultConsensusService::default();
83    ///     let scope = ScopeID::from("my_scope");
84    ///     let request = CreateProposalRequest::new(
85    ///         "Test Proposal".to_string(),
86    ///         b"payload".to_vec(),
87    ///         vec![0u8; 20],
88    ///         3,
89    ///         100,
90    ///         true,
91    ///     )?;
92    ///
93    ///     let proposal = service.create_proposal_with_config(
94    ///         &scope,
95    ///         request,
96    ///         Some(ConsensusConfig::p2p())
97    ///     ).await?;
98    ///
99    ///     let request2 = CreateProposalRequest::new(
100    ///         "Another Proposal".to_string(),
101    ///         b"payload2".to_vec(),
102    ///         vec![0u8; 20],
103    ///         3,
104    ///         100,
105    ///         true,
106    ///     )?;
107    ///     let proposal2 = service.create_proposal_with_config(
108    ///         &scope,
109    ///         request2,
110    ///         None
111    ///     ).await?;
112    ///     Ok(())
113    /// }
114    /// ```
115    async fn create_proposal_with_config(
116        &self,
117        scope: &Scope,
118        request: CreateProposalRequest,
119        config: Option<ConsensusConfig>,
120    ) -> Result<Proposal, ConsensusError> {
121        let proposal = request.into_proposal()?;
122
123        // Resolve config: override > scope config > global default, aligning timeout with proposal
124        let config = self.resolve_config(scope, config, Some(&proposal)).await?;
125
126        let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
127        self.save_session(scope, session).await?;
128        self.trim_scope_sessions(scope).await?;
129
130        Ok(proposal)
131    }
132
133    /// Cast your vote on a proposal (yes or no).
134    ///
135    /// Vote is cryptographically signed and linked to previous votes in the hashgraph.
136    /// Returns the signed vote, which you can then send to other peers in the network.
137    /// Each voter can only vote once per proposal.
138    async fn cast_vote<SN: Signer + Sync + Send>(
139        &self,
140        scope: &Scope,
141        proposal_id: u32,
142        choice: bool,
143        signer: SN,
144    ) -> Result<Vote, ConsensusError> {
145        let session = self.get_session(scope, proposal_id).await?;
146
147        validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
148
149        let voter_address = signer.address().as_slice().to_vec();
150        if session.votes.contains_key(&voter_address) {
151            return Err(ConsensusError::UserAlreadyVoted);
152        }
153
154        let vote = build_vote(&session.proposal, choice, signer).await?;
155        let vote_clone = vote.clone();
156
157        let transition = self
158            .update_session(scope, proposal_id, move |session| {
159                session.add_vote(vote_clone)
160            })
161            .await?;
162
163        self.handle_transition(scope, proposal_id, transition);
164        Ok(vote)
165    }
166
167    /// Cast a vote and immediately get back the updated proposal.
168    ///
169    /// This is a convenience method that combines `cast_vote` and fetching the proposal.
170    /// Useful for proposal creator as they can immediately see the proposal with their vote
171    /// and share it with other peers.
172    async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
173        &self,
174        scope: &Scope,
175        proposal_id: u32,
176        choice: bool,
177        signer: SN,
178    ) -> Result<Proposal, ConsensusError> {
179        self.cast_vote(scope, proposal_id, choice, signer).await?;
180        let session = self.get_session(scope, proposal_id).await?;
181        Ok(session.proposal)
182    }
183
184    /// Process a proposal you received from another peer in the network.
185    ///
186    /// This validates the proposal and all its votes (signatures, vote chains, timestamps),
187    /// then stores it locally.
188    /// If it necessary the consensus configuration is resolved from the proposal.
189    /// If the proposal already has enough votes, consensus is reached
190    /// immediately and an event is emitted.
191    async fn process_incoming_proposal(
192        &self,
193        scope: &Scope,
194        proposal: Proposal,
195    ) -> Result<(), ConsensusError> {
196        if self.get_session(scope, proposal.proposal_id).await.is_ok() {
197            return Err(ConsensusError::ProposalAlreadyExist);
198        }
199
200        let config = self.resolve_config(scope, None, Some(&proposal)).await?;
201        let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
202        self.handle_transition(scope, session.proposal.proposal_id, transition);
203
204        self.save_session(scope, session).await?;
205        self.trim_scope_sessions(scope).await?;
206        Ok(())
207    }
208
209    /// Process a vote you received from another peer.
210    ///
211    /// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
212    /// If this vote brings the total to the consensus threshold, consensus is reached and
213    /// an event is emitted.
214    async fn process_incoming_vote(&self, scope: &Scope, vote: Vote) -> Result<(), ConsensusError> {
215        let session = self.get_session(scope, vote.proposal_id).await?;
216        validate_vote(
217            &vote,
218            session.proposal.expiration_timestamp,
219            session.proposal.timestamp,
220        )?;
221        let proposal_id = vote.proposal_id;
222        let transition = self
223            .update_session(scope, proposal_id, move |session| session.add_vote(vote))
224            .await?;
225
226        self.handle_transition(scope, proposal_id, transition);
227        Ok(())
228    }
229
230    async fn get_proposal(
231        &self,
232        scope: &Scope,
233        proposal_id: u32,
234    ) -> Result<Proposal, ConsensusError> {
235        let session = self.get_session(scope, proposal_id).await?;
236        Ok(session.proposal)
237    }
238
239    async fn get_proposal_payload(
240        &self,
241        scope: &Scope,
242        proposal_id: u32,
243    ) -> Result<Vec<u8>, ConsensusError> {
244        let session = self.get_session(scope, proposal_id).await?;
245        Ok(session.proposal.payload)
246    }
247}