hashgraph_like_consensus/
service_consensus.rs

1use alloy_signer::Signer;
2
3use crate::{
4    error::ConsensusError,
5    events::ConsensusEventBus,
6    protos::consensus::v1::{Proposal, Vote},
7    scope::ConsensusScope,
8    service::ConsensusService,
9    session::{ConsensusConfig, ConsensusSession},
10    storage::ConsensusStorage,
11    types::CreateProposalRequest,
12    utils::{build_vote, validate_proposal_timestamp, validate_vote},
13};
14
15impl<Scope, S, E> ConsensusService<Scope, S, E>
16where
17    Scope: ConsensusScope,
18    S: ConsensusStorage<Scope>,
19    E: ConsensusEventBus<Scope>,
20{
21    /// Create a new proposal and start the voting process.
22    ///
23    /// This creates the proposal, sets up a session to track votes, and schedules automatic
24    /// timeout handling. The proposal will expire after the time specified in the request.
25    ///
26    /// Configuration is resolved from: proposal config > scope config > global default.
27    /// If no config is provided, the scope's default configuration is used.
28    ///
29    /// # Examples
30    ///
31    /// ```rust
32    /// use hashgraph_like_consensus::{scope::ScopeID, scope_config::NetworkType,
33    /// service::DefaultConsensusService, types::CreateProposalRequest};
34    ///
35    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
36    ///     let service = DefaultConsensusService::default();
37    ///     let scope = ScopeID::from("my_scope");
38    ///
39    ///     service
40    ///         .scope(&scope)
41    ///         .await?
42    ///         .with_network_type(NetworkType::P2P)
43    ///         .with_threshold(0.75)
44    ///         .initialize()
45    ///         .await?;
46    ///
47    ///     let request = CreateProposalRequest::new(
48    ///         "Test Proposal".to_string(),
49    ///         "payload".to_string(),
50    ///         vec![0u8; 20],
51    ///         3,
52    ///         100,
53    ///         true,
54    ///     )?;
55    ///     let proposal = service.create_proposal(&scope, request).await?;
56    ///     Ok(())
57    /// }
58    /// ```
59    pub async fn create_proposal(
60        &self,
61        scope: &Scope,
62        request: CreateProposalRequest,
63    ) -> Result<Proposal, ConsensusError> {
64        self.create_proposal_with_config(scope, request, None).await
65    }
66
67    /// Create a new proposal with explicit configuration override.
68    ///
69    /// This allows you to override the scope's default configuration for a specific proposal.
70    /// The override takes precedence over scope config.
71    ///
72    /// # Examples
73    ///
74    /// ```rust
75    /// use hashgraph_like_consensus::{scope::ScopeID,
76    /// service::DefaultConsensusService, session::ConsensusConfig, types::CreateProposalRequest};
77    ///
78    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
79    ///     let service = DefaultConsensusService::default();
80    ///     let scope = ScopeID::from("my_scope");
81    ///     let request = CreateProposalRequest::new(
82    ///         "Test Proposal".to_string(),
83    ///         "payload".to_string(),
84    ///         vec![0u8; 20],
85    ///         3,
86    ///         100,
87    ///         true,
88    ///     )?;
89    ///
90    ///     let proposal = service.create_proposal_with_config(
91    ///         &scope,
92    ///         request,
93    ///         Some(ConsensusConfig::p2p())
94    ///     ).await?;
95    ///
96    ///     let request2 = CreateProposalRequest::new(
97    ///         "Another Proposal".to_string(),
98    ///         "payload2".to_string(),
99    ///         vec![0u8; 20],
100    ///         3,
101    ///         100,
102    ///         true,
103    ///     )?;
104    ///     let proposal2 = service.create_proposal_with_config(
105    ///         &scope,
106    ///         request2,
107    ///         None
108    ///     ).await?;
109    ///     Ok(())
110    /// }
111    /// ```
112    pub async fn create_proposal_with_config(
113        &self,
114        scope: &Scope,
115        request: CreateProposalRequest,
116        config: Option<ConsensusConfig>,
117    ) -> Result<Proposal, ConsensusError> {
118        let proposal = request.into_proposal()?;
119
120        // Resolve config: override > scope config > global default, aligning timeout with proposal
121        let config = self.resolve_config(scope, config, Some(&proposal)).await?;
122
123        let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
124        self.save_session(scope, session).await?;
125        self.trim_scope_sessions(scope).await?;
126
127        Ok(proposal)
128    }
129
130    /// Cast your vote on a proposal (yes or no).
131    ///
132    /// Vote is cryptographically signed and linked to previous votes in the hashgraph.
133    /// Returns the signed vote, which you can then send to other peers in the network.
134    /// Each voter can only vote once per proposal.
135    pub async fn cast_vote<SN: Signer + Sync>(
136        &self,
137        scope: &Scope,
138        proposal_id: u32,
139        choice: bool,
140        signer: SN,
141    ) -> Result<Vote, ConsensusError> {
142        let session = self.get_session(scope, proposal_id).await?;
143
144        validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
145
146        let voter_address = signer.address().as_slice().to_vec();
147        if session.votes.contains_key(&voter_address) {
148            return Err(ConsensusError::UserAlreadyVoted);
149        }
150
151        let vote = build_vote(&session.proposal, choice, signer).await?;
152        let vote_clone = vote.clone();
153
154        let transition = self
155            .update_session(scope, proposal_id, move |session| {
156                session.add_vote(vote_clone)
157            })
158            .await?;
159
160        self.handle_transition(scope, proposal_id, transition);
161        Ok(vote)
162    }
163
164    /// Cast a vote and immediately get back the updated proposal.
165    ///
166    /// This is a convenience method that combines `cast_vote` and fetching the proposal.
167    /// Useful for proposal creator as they can immediately see the proposal with their vote
168    /// and share it with other peers.
169    pub async fn cast_vote_and_get_proposal<SN: Signer + Sync>(
170        &self,
171        scope: &Scope,
172        proposal_id: u32,
173        choice: bool,
174        signer: SN,
175    ) -> Result<Proposal, ConsensusError> {
176        self.cast_vote(scope, proposal_id, choice, signer).await?;
177        let session = self.get_session(scope, proposal_id).await?;
178        Ok(session.proposal)
179    }
180
181    /// Process a proposal you received from another peer in the network.
182    ///
183    /// This validates the proposal and all its votes (signatures, vote chains, timestamps),
184    /// then stores it locally.
185    /// If it necessary the consensus configuration is resolved from the proposal.
186    /// If the proposal already has enough votes, consensus is reached
187    /// immediately and an event is emitted.
188    pub async fn process_incoming_proposal(
189        &self,
190        scope: &Scope,
191        proposal: Proposal,
192    ) -> Result<(), ConsensusError> {
193        if self.get_session(scope, proposal.proposal_id).await.is_ok() {
194            return Err(ConsensusError::ProposalAlreadyExist);
195        }
196
197        let config = self.resolve_config(scope, None, Some(&proposal)).await?;
198        let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
199        self.handle_transition(scope, session.proposal.proposal_id, transition);
200
201        self.save_session(scope, session).await?;
202        self.trim_scope_sessions(scope).await?;
203        Ok(())
204    }
205
206    /// Process a vote you received from another peer.
207    ///
208    /// The vote is validated (signature, timestamp, vote chain) and added to the proposal.
209    /// If this vote brings the total to the consensus threshold, consensus is reached and
210    /// an event is emitted.
211    pub async fn process_incoming_vote(
212        &self,
213        scope: &Scope,
214        vote: Vote,
215    ) -> Result<(), ConsensusError> {
216        let session = self.get_session(scope, vote.proposal_id).await?;
217        validate_vote(
218            &vote,
219            session.proposal.expiration_timestamp,
220            session.proposal.timestamp,
221        )?;
222        let proposal_id = vote.proposal_id;
223        let transition = self
224            .update_session(scope, proposal_id, move |session| session.add_vote(vote))
225            .await?;
226
227        self.handle_transition(scope, proposal_id, transition);
228        Ok(())
229    }
230}