Skip to main content

hashgraph_like_consensus/
service.rs

1use std::marker::PhantomData;
2
3use alloy_signer::Signer;
4use tokio::time::Duration;
5
6use crate::{
7    error::ConsensusError,
8    events::{BroadcastEventBus, ConsensusEventBus},
9    protos::consensus::v1::{Proposal, Vote},
10    scope::{ConsensusScope, ScopeID},
11    scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
12    session::{ConsensusConfig, ConsensusSession, ConsensusState},
13    storage::{ConsensusStorage, InMemoryConsensusStorage},
14    types::{ConsensusEvent, CreateProposalRequest, SessionTransition},
15    utils::{
16        build_vote, calculate_consensus_result, current_timestamp, validate_proposal_timestamp,
17        validate_vote,
18    },
19};
20/// The main service that handles proposals, votes, and consensus.
21///
22/// This is the main entry point for using the consensus service.
23/// It handles creating proposals, processing votes, and managing timeouts.
24pub struct ConsensusService<Scope, S, E>
25where
26    Scope: ConsensusScope,
27    S: ConsensusStorage<Scope>,
28    E: ConsensusEventBus<Scope>,
29{
30    storage: S,
31    max_sessions_per_scope: usize,
32    event_bus: E,
33    _scope: PhantomData<Scope>,
34}
35
36impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
37where
38    Scope: ConsensusScope,
39    S: ConsensusStorage<Scope>,
40    E: ConsensusEventBus<Scope>,
41{
42    fn clone(&self) -> Self {
43        Self {
44            storage: self.storage.clone(),
45            max_sessions_per_scope: self.max_sessions_per_scope,
46            event_bus: self.event_bus.clone(),
47            _scope: PhantomData,
48        }
49    }
50}
51
52/// A ready-to-use service with in-memory storage and broadcast events.
53///
54/// This is the easiest way to get started. It stores everything in memory (great for
55/// testing or single-node setups) and uses a simple broadcast channel for events.
56/// If you need persistence or custom event handling, use `ConsensusService` directly.
57pub type DefaultConsensusService =
58    ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
59
60impl DefaultConsensusService {
61    /// Create a service with default settings (10 max sessions per scope).
62    fn new() -> Self {
63        Self::new_with_max_sessions(10)
64    }
65
66    /// Create a service with a custom limit on how many sessions can exist per scope.
67    ///
68    /// When the limit is reached, older sessions are automatically removed to make room.
69    /// Eviction is silent — no event is emitted. Archive results you need before they
70    /// are evicted.
71    pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
72        Self::new_with_components(
73            InMemoryConsensusStorage::new(),
74            BroadcastEventBus::default(),
75            max_sessions_per_scope,
76        )
77    }
78}
79
80impl Default for DefaultConsensusService {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl<Scope, S, E> ConsensusService<Scope, S, E>
87where
88    Scope: ConsensusScope,
89    S: ConsensusStorage<Scope>,
90    E: ConsensusEventBus<Scope>,
91{
92    /// Build a service with your own storage and event bus implementations.
93    ///
94    /// Use this when you need custom persistence (like a database) or event handling.
95    /// The `max_sessions_per_scope` parameter controls how many sessions can exist per scope.
96    /// When the limit is reached, older sessions are automatically removed.
97    pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
98        Self {
99            storage,
100            max_sessions_per_scope,
101            event_bus,
102            _scope: PhantomData,
103        }
104    }
105
106    // ── Accessors ──────────────────────────────────────────────────────
107
108    /// Access the underlying storage backend.
109    ///
110    /// Use this for reading state (sessions, proposals, scope config) and for
111    /// lifecycle operations like [`delete_scope`](ConsensusStorage::delete_scope).
112    pub fn storage(&self) -> &S {
113        &self.storage
114    }
115
116    /// Access the underlying event bus.
117    ///
118    /// Use this to [`subscribe`](ConsensusEventBus::subscribe) to consensus events.
119    pub fn event_bus(&self) -> &E {
120        &self.event_bus
121    }
122
123    // ── Consensus operations (business logic) ──────────────────────────
124
125    /// Create a new proposal and start the voting process.
126    ///
127    /// This creates the proposal and sets up a session to track votes.
128    /// The proposal will expire after the time specified in the request.
129    ///
130    /// **Important:** The library does not schedule timeouts automatically.
131    /// Your application MUST call [`handle_consensus_timeout`](Self::handle_consensus_timeout)
132    /// when the proposal's timeout elapses (e.g. via `tokio::time::sleep`).
133    /// Without this call, proposals with offline voters will remain stuck in the
134    /// `Active` state indefinitely, and the liveness criteria for silent peers
135    /// will never take effect.
136    ///
137    /// Configuration is resolved from: proposal config > scope config > global default.
138    /// If no config is provided, the scope's default configuration is used.
139    pub async fn create_proposal(
140        &self,
141        scope: &Scope,
142        request: CreateProposalRequest,
143    ) -> Result<Proposal, ConsensusError> {
144        self.create_proposal_with_config(scope, request, None).await
145    }
146
147    /// Create a new proposal with an explicit [`ConsensusConfig`] override.
148    ///
149    /// Pass `None` to fall back to scope defaults (same as [`create_proposal`](Self::create_proposal)).
150    pub async fn create_proposal_with_config(
151        &self,
152        scope: &Scope,
153        request: CreateProposalRequest,
154        config: Option<ConsensusConfig>,
155    ) -> Result<Proposal, ConsensusError> {
156        let proposal = request.into_proposal()?;
157        let config = self.resolve_config(scope, config, Some(&proposal)).await?;
158        let (session, _) = ConsensusSession::from_proposal(proposal.clone(), config.clone())?;
159        self.save_session(scope, session).await?;
160        self.trim_scope_sessions(scope).await?;
161        Ok(proposal)
162    }
163
164    /// Cast a vote on an active proposal.
165    ///
166    /// The vote is cryptographically signed with `signer` and linked into the
167    /// hashgraph chain. Returns the signed [`Vote`] for network propagation.
168    /// Each voter can only vote once per proposal.
169    pub async fn cast_vote<SN: Signer + Sync + Send>(
170        &self,
171        scope: &Scope,
172        proposal_id: u32,
173        choice: bool,
174        signer: SN,
175    ) -> Result<Vote, ConsensusError> {
176        let session = self.get_session(scope, proposal_id).await?;
177        validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
178
179        let voter_address = signer.address().as_slice().to_vec();
180        if session.votes.contains_key(&voter_address) {
181            return Err(ConsensusError::UserAlreadyVoted);
182        }
183
184        let vote = build_vote(&session.proposal, choice, signer).await?;
185        let vote_clone = vote.clone();
186        let transition = self
187            .update_session(scope, proposal_id, move |session| {
188                session.add_vote(vote_clone)
189            })
190            .await?;
191        self.handle_transition(scope, proposal_id, transition);
192        Ok(vote)
193    }
194
195    /// Cast a vote and return the updated [`Proposal`] (with the new vote included).
196    ///
197    /// Convenience method useful for the proposal creator who wants to immediately
198    /// gossip the updated proposal to peers.
199    pub async fn cast_vote_and_get_proposal<SN: Signer + Sync + Send>(
200        &self,
201        scope: &Scope,
202        proposal_id: u32,
203        choice: bool,
204        signer: SN,
205    ) -> Result<Proposal, ConsensusError> {
206        self.cast_vote(scope, proposal_id, choice, signer).await?;
207        let session = self.get_session(scope, proposal_id).await?;
208        Ok(session.proposal)
209    }
210
211    /// Process a proposal received from the network.
212    ///
213    /// Call this when your networking layer delivers a proposal from another peer.
214    /// The library performs no I/O — your application must handle gossip/transport
215    /// and call this method on receipt.
216    ///
217    /// Validates the proposal and all embedded votes, then stores it locally.
218    /// If enough votes are already present, consensus is reached immediately.
219    pub async fn process_incoming_proposal(
220        &self,
221        scope: &Scope,
222        proposal: Proposal,
223    ) -> Result<(), ConsensusError> {
224        if self.get_session(scope, proposal.proposal_id).await.is_ok() {
225            return Err(ConsensusError::ProposalAlreadyExist);
226        }
227        let config = self.resolve_config(scope, None, Some(&proposal)).await?;
228        let (session, transition) = ConsensusSession::from_proposal(proposal, config)?;
229        self.handle_transition(scope, session.proposal.proposal_id, transition);
230        self.save_session(scope, session).await?;
231        self.trim_scope_sessions(scope).await?;
232        Ok(())
233    }
234
235    /// Process a single vote received from the network.
236    ///
237    /// Call this when your networking layer delivers a vote from another peer.
238    /// Validates the vote (signature, timestamp, chain) and adds it to the
239    /// corresponding proposal session. May trigger consensus.
240    pub async fn process_incoming_vote(
241        &self,
242        scope: &Scope,
243        vote: Vote,
244    ) -> Result<(), ConsensusError> {
245        let session = self.get_session(scope, vote.proposal_id).await?;
246        validate_vote(
247            &vote,
248            session.proposal.expiration_timestamp,
249            session.proposal.timestamp,
250        )?;
251        let proposal_id = vote.proposal_id;
252        let transition = self
253            .update_session(scope, proposal_id, move |session| session.add_vote(vote))
254            .await?;
255        self.handle_transition(scope, proposal_id, transition);
256        Ok(())
257    }
258
259    /// Handle the timeout for a proposal.
260    ///
261    /// **The library does not call this automatically.** Your application MUST
262    /// schedule a timer (e.g. `tokio::time::sleep(config.consensus_timeout())`)
263    /// and invoke this method when it fires. Without this call, proposals with
264    /// offline voters will stay in the `Active` state forever and silent-peer
265    /// liveness logic will never run.
266    ///
267    /// At timeout, silent peers are counted toward quorum using the proposal's
268    /// `liveness_criteria_yes` flag (RFC Section 4, Silent Node Management):
269    ///
270    /// - `liveness_criteria_yes = true` — silent peers count as YES
271    /// - `liveness_criteria_yes = false` — silent peers count as NO
272    ///
273    /// Returns the consensus result if determinable, or
274    /// [`InsufficientVotesAtTimeout`](ConsensusError::InsufficientVotesAtTimeout)
275    /// if the result is a tie after counting silent peers.
276    pub async fn handle_consensus_timeout(
277        &self,
278        scope: &Scope,
279        proposal_id: u32,
280    ) -> Result<bool, ConsensusError> {
281        let timeout_result: Result<Option<bool>, ConsensusError> = self
282            .update_session(scope, proposal_id, |session| {
283                if let ConsensusState::ConsensusReached(result) = session.state {
284                    return Ok(Some(result));
285                }
286                let result = calculate_consensus_result(
287                    &session.votes,
288                    session.proposal.expected_voters_count,
289                    session.config.consensus_threshold(),
290                    session.proposal.liveness_criteria_yes,
291                    true,
292                );
293                if let Some(result) = result {
294                    session.state = ConsensusState::ConsensusReached(result);
295                    Ok(Some(result))
296                } else {
297                    session.state = ConsensusState::Failed;
298                    Ok(None)
299                }
300            })
301            .await;
302
303        match timeout_result? {
304            Some(consensus_result) => {
305                self.emit_event(
306                    scope,
307                    ConsensusEvent::ConsensusReached {
308                        proposal_id,
309                        result: consensus_result,
310                        timestamp: current_timestamp()?,
311                    },
312                );
313                Ok(consensus_result)
314            }
315            None => {
316                self.emit_event(
317                    scope,
318                    ConsensusEvent::ConsensusFailed {
319                        proposal_id,
320                        timestamp: current_timestamp()?,
321                    },
322                );
323                Err(ConsensusError::InsufficientVotesAtTimeout)
324            }
325        }
326    }
327
328    // ── Scope management ─────────────────────────────────────────────
329
330    /// Get a builder for a scope configuration.
331    ///
332    /// # Example
333    /// ```rust,no_run
334    /// use hashgraph_like_consensus::{scope_config::NetworkType, scope::ScopeID, service::DefaultConsensusService};
335    /// use std::time::Duration;
336    ///
337    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
338    ///   let service = DefaultConsensusService::default();
339    ///   let scope = ScopeID::from("my_scope");
340    ///
341    ///   // Initialize new scope
342    ///   service
343    ///     .scope(&scope)
344    ///     .await?
345    ///     .with_network_type(NetworkType::P2P)
346    ///     .with_threshold(0.75)
347    ///     .with_timeout(Duration::from_secs(120))
348    ///     .initialize()
349    ///     .await?;
350    ///
351    ///   // Update existing scope (single field)
352    ///   service
353    ///     .scope(&scope)
354    ///     .await?
355    ///     .with_threshold(0.8)
356    ///     .update()
357    ///     .await?;
358    ///   Ok(())
359    /// }
360    /// ```
361    pub async fn scope(
362        &self,
363        scope: &Scope,
364    ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
365        let existing_config = self.storage.get_scope_config(scope).await?;
366        let builder = if let Some(config) = existing_config {
367            ScopeConfigBuilder::from_existing(config)
368        } else {
369            ScopeConfigBuilder::new()
370        };
371        Ok(ScopeConfigBuilderWrapper::new(
372            self.clone(),
373            scope.clone(),
374            builder,
375        ))
376    }
377
378    async fn initialize_scope(
379        &self,
380        scope: &Scope,
381        config: ScopeConfig,
382    ) -> Result<(), ConsensusError> {
383        config.validate()?;
384        self.storage.set_scope_config(scope, config).await
385    }
386
387    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
388    where
389        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
390    {
391        self.storage.update_scope_config(scope, updater).await
392    }
393
394    /// Resolve configuration for a proposal.
395    ///
396    /// Priority: proposal override > proposal fields (expiration_timestamp, liveness_criteria_yes)
397    ///   > scope config > global default
398    async fn resolve_config(
399        &self,
400        scope: &Scope,
401        proposal_override: Option<ConsensusConfig>,
402        proposal: Option<&Proposal>,
403    ) -> Result<ConsensusConfig, ConsensusError> {
404        // 1. If explicit config override exists, use it as base
405        // NOTE: if a per-proposal override is provided, we should not stomp its timeout
406        // from the proposal's expiration fields (the caller explicitly chose it).
407        let has_explicit_override = proposal_override.is_some();
408        let base_config = if let Some(override_config) = proposal_override {
409            override_config
410        } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
411            ConsensusConfig::from(scope_config)
412        } else {
413            ConsensusConfig::gossipsub()
414        };
415
416        // 2. Apply proposal field overrides if proposal is provided
417        if let Some(prop) = proposal {
418            // Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
419            // unless an explicit override was supplied.
420            let timeout_seconds = if has_explicit_override {
421                base_config.consensus_timeout()
422            } else if prop.expiration_timestamp > prop.timestamp {
423                Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
424            } else {
425                base_config.consensus_timeout()
426            };
427
428            Ok(ConsensusConfig::new(
429                base_config.consensus_threshold(),
430                timeout_seconds,
431                base_config.max_rounds(),
432                base_config.use_gossipsub_rounds(),
433                prop.liveness_criteria_yes,
434            ))
435        } else {
436            Ok(base_config)
437        }
438    }
439
440    async fn get_session(
441        &self,
442        scope: &Scope,
443        proposal_id: u32,
444    ) -> Result<ConsensusSession, ConsensusError> {
445        self.storage
446            .get_session(scope, proposal_id)
447            .await?
448            .ok_or(ConsensusError::SessionNotFound)
449    }
450
451    async fn update_session<R, F>(
452        &self,
453        scope: &Scope,
454        proposal_id: u32,
455        mutator: F,
456    ) -> Result<R, ConsensusError>
457    where
458        R: Send,
459        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
460    {
461        self.storage
462            .update_session(scope, proposal_id, mutator)
463            .await
464    }
465
466    async fn save_session(
467        &self,
468        scope: &Scope,
469        session: ConsensusSession,
470    ) -> Result<(), ConsensusError> {
471        self.storage.save_session(scope, session).await
472    }
473
474    async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
475        self.storage
476            .update_scope_sessions(scope, |sessions| {
477                if sessions.len() <= self.max_sessions_per_scope {
478                    return Ok(());
479                }
480
481                sessions.sort_by_key(|s| std::cmp::Reverse(s.created_at));
482                sessions.truncate(self.max_sessions_per_scope);
483                Ok(())
484            })
485            .await
486    }
487
488    pub(crate) async fn list_scope_sessions(
489        &self,
490        scope: &Scope,
491    ) -> Result<Vec<ConsensusSession>, ConsensusError> {
492        self.storage
493            .list_scope_sessions(scope)
494            .await?
495            .ok_or(ConsensusError::ScopeNotFound)
496    }
497
498    fn handle_transition(&self, scope: &Scope, proposal_id: u32, transition: SessionTransition) {
499        if let SessionTransition::ConsensusReached(result) = transition {
500            self.emit_event(
501                scope,
502                ConsensusEvent::ConsensusReached {
503                    proposal_id,
504                    result,
505                    timestamp: current_timestamp().unwrap_or(0),
506                },
507            );
508        }
509    }
510
511    fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
512        self.event_bus.publish(scope.clone(), event);
513    }
514}
515
516/// Wrapper around ScopeConfigBuilder that stores service and scope for convenience methods.
517pub struct ScopeConfigBuilderWrapper<Scope, S, E>
518where
519    Scope: ConsensusScope,
520    S: ConsensusStorage<Scope>,
521    E: ConsensusEventBus<Scope>,
522{
523    service: ConsensusService<Scope, S, E>,
524    scope: Scope,
525    builder: ScopeConfigBuilder,
526}
527
528impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
529where
530    Scope: ConsensusScope,
531    S: ConsensusStorage<Scope>,
532    E: ConsensusEventBus<Scope>,
533{
534    fn new(
535        service: ConsensusService<Scope, S, E>,
536        scope: Scope,
537        builder: ScopeConfigBuilder,
538    ) -> Self {
539        Self {
540            service,
541            scope,
542            builder,
543        }
544    }
545
546    /// Set network type (P2P or Gossipsub)
547    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
548        self.builder = self.builder.with_network_type(network_type);
549        self
550    }
551
552    /// Set consensus threshold (0.0 to 1.0)
553    pub fn with_threshold(mut self, threshold: f64) -> Self {
554        self.builder = self.builder.with_threshold(threshold);
555        self
556    }
557
558    /// Set default timeout for proposals (in seconds)
559    pub fn with_timeout(mut self, timeout: Duration) -> Self {
560        self.builder = self.builder.with_timeout(timeout);
561        self
562    }
563
564    /// Set liveness criteria (how silent peers are counted)
565    pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
566        self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
567        self
568    }
569
570    /// Override max rounds (if None, uses network_type defaults)
571    pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
572        self.builder = self.builder.with_max_rounds(max_rounds);
573        self
574    }
575
576    /// Use P2P preset with common defaults
577    pub fn p2p_preset(mut self) -> Self {
578        self.builder = self.builder.p2p_preset();
579        self
580    }
581
582    /// Use Gossipsub preset with common defaults
583    pub fn gossipsub_preset(mut self) -> Self {
584        self.builder = self.builder.gossipsub_preset();
585        self
586    }
587
588    /// Use strict consensus (higher threshold = 0.9)
589    pub fn strict_consensus(mut self) -> Self {
590        self.builder = self.builder.strict_consensus();
591        self
592    }
593
594    /// Use fast consensus (lower threshold = 0.6, shorter timeout = 30s)
595    pub fn fast_consensus(mut self) -> Self {
596        self.builder = self.builder.fast_consensus();
597        self
598    }
599
600    /// Start with network-specific defaults
601    pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
602        self.builder = self.builder.with_network_defaults(network_type);
603        self
604    }
605
606    /// Initialize scope with the built configuration
607    pub async fn initialize(self) -> Result<(), ConsensusError> {
608        let config = self.builder.build()?;
609        self.service.initialize_scope(&self.scope, config).await
610    }
611
612    /// Update existing scope configuration with the built configuration
613    pub async fn update(self) -> Result<(), ConsensusError> {
614        let config = self.builder.build()?;
615        self.service
616            .update_scope_config(&self.scope, |existing| {
617                *existing = config;
618                Ok(())
619            })
620            .await
621    }
622
623    /// Get the current configuration (useful for testing)
624    pub fn get_config(&self) -> ScopeConfig {
625        self.builder.get_config()
626    }
627}