Skip to main content

hashgraph_like_consensus/
service.rs

1use std::marker::PhantomData;
2
3use tokio::time::Duration;
4
5use crate::{
6    error::ConsensusError,
7    events::{BroadcastEventBus, ConsensusEventBus},
8    protos::consensus::v1::{Proposal, Vote},
9    scope::{ConsensusScope, ScopeID},
10    scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
11    session::{ConsensusConfig, ConsensusSession, ConsensusState},
12    signing::{ConsensusSignatureScheme, EthereumConsensusSigner},
13    storage::{ConsensusStorage, InMemoryConsensusStorage},
14    types::{ConsensusEvent, CreateProposalRequest, SessionTransition},
15    utils::{
16        build_vote, calculate_consensus_result, current_timestamp, validate_proposal_timestamp,
17        validate_vote,
18    },
19};
20/// The main service that handles proposals, votes, and consensus.
21///
22/// This is the main entry point for using the consensus service.
23/// It handles creating proposals, processing votes, and managing timeouts.
24///
25/// Each `ConsensusService` represents **one peer's view**: it holds the
26/// storage handle, the event bus, and that peer's signer. The multi-peer
27/// pattern is one service per peer with shared storage and (optionally)
28/// shared event bus — see the README "Service shape" section.
29///
30/// Generic parameters:
31///
32/// - `Scope`: scope key type (see [`ConsensusScope`]).
33/// - `Storage`: storage backend (see [`ConsensusStorage`]).
34/// - `Event`: event bus backend (see [`ConsensusEventBus`]).
35/// - `Signer`: signature scheme (see [`ConsensusSignatureScheme`]).
36///   The held instance signs this peer's outgoing votes; the same type
37///   provides `Signer::verify` for validating incoming votes.
38pub struct ConsensusService<Scope, Storage, Event, Signer>
39where
40    Scope: ConsensusScope,
41    Storage: ConsensusStorage<Scope>,
42    Event: ConsensusEventBus<Scope>,
43    Signer: ConsensusSignatureScheme,
44{
45    storage: Storage,
46    max_sessions_per_scope: usize,
47    event_bus: Event,
48    signer: Signer,
49    _scope: PhantomData<Scope>,
50}
51
52impl<Scope, Storage, Event, Signer> Clone for ConsensusService<Scope, Storage, Event, Signer>
53where
54    Scope: ConsensusScope,
55    Storage: ConsensusStorage<Scope>,
56    Event: ConsensusEventBus<Scope>,
57    Signer: ConsensusSignatureScheme,
58{
59    fn clone(&self) -> Self {
60        Self {
61            storage: self.storage.clone(),
62            max_sessions_per_scope: self.max_sessions_per_scope,
63            event_bus: self.event_bus.clone(),
64            signer: self.signer.clone(),
65            _scope: PhantomData,
66        }
67    }
68}
69
70/// A ready-to-use service with in-memory storage, broadcast events, and the
71/// [`EthereumConsensusSigner`] scheme. Intended for tests and single-node
72/// integrations. Production deployments typically construct
73/// [`ConsensusService`] directly with their own scheme.
74pub type DefaultConsensusService = ConsensusService<
75    ScopeID,
76    InMemoryConsensusStorage<ScopeID>,
77    BroadcastEventBus<ScopeID>,
78    EthereumConsensusSigner,
79>;
80
81impl DefaultConsensusService {
82    /// Create a service with in-memory storage, broadcast events, and the
83    /// given signer. Defaults to 10 max sessions per scope.
84    pub fn new(signer: EthereumConsensusSigner) -> Self {
85        Self::new_with_max_sessions(signer, 10)
86    }
87
88    /// Create a service with a custom limit on how many sessions can exist per scope.
89    ///
90    /// When the limit is reached, older sessions are automatically removed to make room.
91    /// Eviction is silent — no event is emitted. Archive results you need before they
92    /// are evicted.
93    pub fn new_with_max_sessions(
94        signer: EthereumConsensusSigner,
95        max_sessions_per_scope: usize,
96    ) -> Self {
97        Self::new_with_components(
98            InMemoryConsensusStorage::new(),
99            BroadcastEventBus::default(),
100            signer,
101            max_sessions_per_scope,
102        )
103    }
104}
105
106impl<Scope, Storage, Event, Signer> ConsensusService<Scope, Storage, Event, Signer>
107where
108    Scope: ConsensusScope,
109    Storage: ConsensusStorage<Scope>,
110    Event: ConsensusEventBus<Scope>,
111    Signer: ConsensusSignatureScheme,
112{
113    /// Build a service with your own storage and event bus implementations.
114    ///
115    /// The signature scheme `Signer` is selected via turbofish or type inference at
116    /// the call site (or by using a type alias like
117    /// [`DefaultConsensusService`]). Use this when you need custom persistence
118    /// (like a database) or event handling. The `max_sessions_per_scope`
119    /// parameter controls how many sessions can exist per scope. When the
120    /// limit is reached, older sessions are automatically removed.
121    pub fn new_with_components(
122        storage: Storage,
123        event_bus: Event,
124        signer: Signer,
125        max_sessions_per_scope: usize,
126    ) -> Self {
127        Self {
128            storage,
129            max_sessions_per_scope,
130            event_bus,
131            signer,
132            _scope: PhantomData,
133        }
134    }
135
136    // ── Accessors ──────────────────────────────────────────────────────
137
138    /// Access the underlying storage backend.
139    ///
140    /// Use this for reading state (sessions, proposals, scope config) and for
141    /// lifecycle operations like [`delete_scope`](ConsensusStorage::delete_scope).
142    pub fn storage(&self) -> &Storage {
143        &self.storage
144    }
145
146    /// Access the underlying event bus.
147    ///
148    /// Use this to [`subscribe`](ConsensusEventBus::subscribe) to consensus events.
149    pub fn event_bus(&self) -> &Event {
150        &self.event_bus
151    }
152
153    /// Access this peer's signer.
154    ///
155    /// Use this to read the peer's identity (e.g. for proposal owner fields):
156    /// `service.signer().identity()`.
157    pub fn signer(&self) -> &Signer {
158        &self.signer
159    }
160
161    // ── Consensus operations (business logic) ──────────────────────────
162
163    /// Create a new proposal and start the voting process.
164    ///
165    /// This creates the proposal and sets up a session to track votes.
166    /// The proposal will expire after the time specified in the request.
167    ///
168    /// **Important:** The library does not schedule timeouts automatically.
169    /// Your application MUST call [`handle_consensus_timeout`](Self::handle_consensus_timeout)
170    /// when the proposal's timeout elapses (e.g. via `tokio::time::sleep`).
171    /// Without this call, proposals with offline voters will remain stuck in the
172    /// `Active` state indefinitely, and the liveness criteria for silent peers
173    /// will never take effect.
174    ///
175    /// Configuration is resolved from: proposal config > scope config > global default.
176    /// If no config is provided, the scope's default configuration is used.
177    pub async fn create_proposal(
178        &self,
179        scope: &Scope,
180        request: CreateProposalRequest,
181    ) -> Result<Proposal, ConsensusError> {
182        self.create_proposal_with_config(scope, request, None).await
183    }
184
185    /// Create a new proposal with an explicit [`ConsensusConfig`] override.
186    ///
187    /// Pass `None` to fall back to scope defaults (same as [`create_proposal`](Self::create_proposal)).
188    pub async fn create_proposal_with_config(
189        &self,
190        scope: &Scope,
191        request: CreateProposalRequest,
192        config: Option<ConsensusConfig>,
193    ) -> Result<Proposal, ConsensusError> {
194        let proposal = request.into_proposal()?;
195        let config = self.resolve_config(scope, config, Some(&proposal)).await?;
196        let (session, _) =
197            ConsensusSession::from_proposal::<Signer>(proposal.clone(), config.clone())?;
198        self.save_session(scope, session).await?;
199        self.trim_scope_sessions(scope).await?;
200        Ok(proposal)
201    }
202
203    /// Cast a vote on an active proposal using this service's held signer.
204    ///
205    /// The vote is cryptographically signed and linked into the hashgraph
206    /// chain. Returns the signed [`Vote`] for network propagation. Each peer
207    /// (identity) can only vote once per proposal.
208    pub async fn cast_vote(
209        &self,
210        scope: &Scope,
211        proposal_id: u32,
212        choice: bool,
213    ) -> Result<Vote, ConsensusError> {
214        let session = self.get_session(scope, proposal_id).await?;
215        validate_proposal_timestamp(session.proposal.expiration_timestamp)?;
216
217        if session.votes.contains_key(self.signer.identity()) {
218            return Err(ConsensusError::UserAlreadyVoted);
219        }
220
221        let vote = build_vote(&session.proposal, choice, &self.signer).await?;
222        let vote_clone = vote.clone();
223        let transition = self
224            .update_session(scope, proposal_id, move |session| {
225                session.add_vote(vote_clone)
226            })
227            .await?;
228        self.handle_transition(scope, proposal_id, transition);
229        Ok(vote)
230    }
231
232    /// Cast a vote and return the updated [`Proposal`] (with the new vote included).
233    ///
234    /// Convenience method useful for the proposal creator who wants to immediately
235    /// gossip the updated proposal to peers.
236    pub async fn cast_vote_and_get_proposal(
237        &self,
238        scope: &Scope,
239        proposal_id: u32,
240        choice: bool,
241    ) -> Result<Proposal, ConsensusError> {
242        self.cast_vote(scope, proposal_id, choice).await?;
243        let session = self.get_session(scope, proposal_id).await?;
244        Ok(session.proposal)
245    }
246
247    /// Process a proposal received from the network.
248    ///
249    /// Call this when your networking layer delivers a proposal from another peer.
250    /// The library performs no I/O — your application must handle gossip/transport
251    /// and call this method on receipt.
252    ///
253    /// Validates the proposal and all embedded votes, then stores it locally.
254    /// If enough votes are already present, consensus is reached immediately.
255    pub async fn process_incoming_proposal(
256        &self,
257        scope: &Scope,
258        proposal: Proposal,
259    ) -> Result<(), ConsensusError> {
260        if self.get_session(scope, proposal.proposal_id).await.is_ok() {
261            return Err(ConsensusError::ProposalAlreadyExist);
262        }
263        let config = self.resolve_config(scope, None, Some(&proposal)).await?;
264        let (session, transition) = ConsensusSession::from_proposal::<Signer>(proposal, config)?;
265        self.handle_transition(scope, session.proposal.proposal_id, transition);
266        self.save_session(scope, session).await?;
267        self.trim_scope_sessions(scope).await?;
268        Ok(())
269    }
270
271    /// Process a single vote received from the network.
272    ///
273    /// Call this when your networking layer delivers a vote from another peer.
274    /// Validates the vote (signature, timestamp, chain) and adds it to the
275    /// corresponding proposal session. May trigger consensus.
276    pub async fn process_incoming_vote(
277        &self,
278        scope: &Scope,
279        vote: Vote,
280    ) -> Result<(), ConsensusError> {
281        let session = self.get_session(scope, vote.proposal_id).await?;
282        validate_vote::<Signer>(
283            &vote,
284            session.proposal.expiration_timestamp,
285            session.proposal.timestamp,
286        )?;
287        let proposal_id = vote.proposal_id;
288        let transition = self
289            .update_session(scope, proposal_id, move |session| session.add_vote(vote))
290            .await?;
291        self.handle_transition(scope, proposal_id, transition);
292        Ok(())
293    }
294
295    /// Handle the timeout for a proposal.
296    ///
297    /// **The library does not call this automatically.** Your application MUST
298    /// schedule a timer (e.g. `tokio::time::sleep(config.consensus_timeout())`)
299    /// and invoke this method when it fires. Without this call, proposals with
300    /// offline voters will stay in the `Active` state forever and silent-peer
301    /// liveness logic will never run.
302    ///
303    /// At timeout, silent peers are counted toward quorum using the proposal's
304    /// `liveness_criteria_yes` flag (RFC Section 4, Silent Node Management):
305    ///
306    /// - `liveness_criteria_yes = true` — silent peers count as YES
307    /// - `liveness_criteria_yes = false` — silent peers count as NO
308    ///
309    /// Returns the consensus result if determinable, or
310    /// [`InsufficientVotesAtTimeout`](ConsensusError::InsufficientVotesAtTimeout)
311    /// if the result is a tie after counting silent peers.
312    pub async fn handle_consensus_timeout(
313        &self,
314        scope: &Scope,
315        proposal_id: u32,
316    ) -> Result<bool, ConsensusError> {
317        let timeout_result: Result<Option<bool>, ConsensusError> = self
318            .update_session(scope, proposal_id, |session| {
319                if let ConsensusState::ConsensusReached(result) = session.state {
320                    return Ok(Some(result));
321                }
322                let result = calculate_consensus_result(
323                    &session.votes,
324                    session.proposal.expected_voters_count,
325                    session.config.consensus_threshold(),
326                    session.proposal.liveness_criteria_yes,
327                    true,
328                );
329                if let Some(result) = result {
330                    session.state = ConsensusState::ConsensusReached(result);
331                    Ok(Some(result))
332                } else {
333                    session.state = ConsensusState::Failed;
334                    Ok(None)
335                }
336            })
337            .await;
338
339        match timeout_result? {
340            Some(consensus_result) => {
341                self.emit_event(
342                    scope,
343                    ConsensusEvent::ConsensusReached {
344                        proposal_id,
345                        result: consensus_result,
346                        timestamp: current_timestamp()?,
347                    },
348                );
349                Ok(consensus_result)
350            }
351            None => {
352                self.emit_event(
353                    scope,
354                    ConsensusEvent::ConsensusFailed {
355                        proposal_id,
356                        timestamp: current_timestamp()?,
357                    },
358                );
359                Err(ConsensusError::InsufficientVotesAtTimeout)
360            }
361        }
362    }
363
364    // ── Scope management ─────────────────────────────────────────────
365
366    /// Get a builder for a scope configuration.
367    ///
368    /// # Example
369    /// ```rust,no_run
370    /// use hashgraph_like_consensus::{
371    ///     scope::ScopeID,
372    ///     scope_config::NetworkType,
373    ///     service::DefaultConsensusService,
374    ///     signing::EthereumConsensusSigner,
375    /// };
376    /// use alloy::signers::local::PrivateKeySigner;
377    /// use std::time::Duration;
378    ///
379    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
380    ///   let signer = EthereumConsensusSigner::new(PrivateKeySigner::random());
381    ///   let service = DefaultConsensusService::new(signer);
382    ///   let scope = ScopeID::from("my_scope");
383    ///
384    ///   // Initialize new scope
385    ///   service
386    ///     .scope(&scope)
387    ///     .await?
388    ///     .with_network_type(NetworkType::P2P)
389    ///     .with_threshold(0.75)
390    ///     .with_timeout(Duration::from_secs(120))
391    ///     .initialize()
392    ///     .await?;
393    ///
394    ///   // Update existing scope (single field)
395    ///   service
396    ///     .scope(&scope)
397    ///     .await?
398    ///     .with_threshold(0.8)
399    ///     .update()
400    ///     .await?;
401    ///   Ok(())
402    /// }
403    /// ```
404    pub async fn scope(
405        &self,
406        scope: &Scope,
407    ) -> Result<ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>, ConsensusError> {
408        let existing_config = self.storage.get_scope_config(scope).await?;
409        let builder = if let Some(config) = existing_config {
410            ScopeConfigBuilder::from_existing(config)
411        } else {
412            ScopeConfigBuilder::new()
413        };
414        Ok(ScopeConfigBuilderWrapper::new(
415            self.clone(),
416            scope.clone(),
417            builder,
418        ))
419    }
420
421    async fn initialize_scope(
422        &self,
423        scope: &Scope,
424        config: ScopeConfig,
425    ) -> Result<(), ConsensusError> {
426        config.validate()?;
427        self.storage.set_scope_config(scope, config).await
428    }
429
430    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
431    where
432        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
433    {
434        self.storage.update_scope_config(scope, updater).await
435    }
436
437    /// Resolve configuration for a proposal.
438    ///
439    /// Priority: proposal override > proposal fields (expiration_timestamp, liveness_criteria_yes)
440    ///   > scope config > global default
441    async fn resolve_config(
442        &self,
443        scope: &Scope,
444        proposal_override: Option<ConsensusConfig>,
445        proposal: Option<&Proposal>,
446    ) -> Result<ConsensusConfig, ConsensusError> {
447        // 1. If explicit config override exists, use it as base
448        // NOTE: if a per-proposal override is provided, we should not stomp its timeout
449        // from the proposal's expiration fields (the caller explicitly chose it).
450        let has_explicit_override = proposal_override.is_some();
451        let base_config = if let Some(override_config) = proposal_override {
452            override_config
453        } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
454            ConsensusConfig::from(scope_config)
455        } else {
456            ConsensusConfig::gossipsub()
457        };
458
459        // 2. Apply proposal field overrides if proposal is provided
460        if let Some(prop) = proposal {
461            // Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
462            // unless an explicit override was supplied.
463            let timeout_seconds = if has_explicit_override {
464                base_config.consensus_timeout()
465            } else if prop.expiration_timestamp > prop.timestamp {
466                Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
467            } else {
468                base_config.consensus_timeout()
469            };
470
471            Ok(ConsensusConfig::new(
472                base_config.consensus_threshold(),
473                timeout_seconds,
474                base_config.max_rounds(),
475                base_config.use_gossipsub_rounds(),
476                prop.liveness_criteria_yes,
477            ))
478        } else {
479            Ok(base_config)
480        }
481    }
482
483    async fn get_session(
484        &self,
485        scope: &Scope,
486        proposal_id: u32,
487    ) -> Result<ConsensusSession, ConsensusError> {
488        self.storage
489            .get_session(scope, proposal_id)
490            .await?
491            .ok_or(ConsensusError::SessionNotFound)
492    }
493
494    async fn update_session<R, F>(
495        &self,
496        scope: &Scope,
497        proposal_id: u32,
498        mutator: F,
499    ) -> Result<R, ConsensusError>
500    where
501        R: Send,
502        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
503    {
504        self.storage
505            .update_session(scope, proposal_id, mutator)
506            .await
507    }
508
509    async fn save_session(
510        &self,
511        scope: &Scope,
512        session: ConsensusSession,
513    ) -> Result<(), ConsensusError> {
514        self.storage.save_session(scope, session).await
515    }
516
517    async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
518        self.storage
519            .update_scope_sessions(scope, |sessions| {
520                if sessions.len() <= self.max_sessions_per_scope {
521                    return Ok(());
522                }
523
524                sessions.sort_by_key(|s| std::cmp::Reverse(s.created_at));
525                sessions.truncate(self.max_sessions_per_scope);
526                Ok(())
527            })
528            .await
529    }
530
531    pub(crate) async fn list_scope_sessions(
532        &self,
533        scope: &Scope,
534    ) -> Result<Vec<ConsensusSession>, ConsensusError> {
535        self.storage
536            .list_scope_sessions(scope)
537            .await?
538            .ok_or(ConsensusError::ScopeNotFound)
539    }
540
541    fn handle_transition(&self, scope: &Scope, proposal_id: u32, transition: SessionTransition) {
542        if let SessionTransition::ConsensusReached(result) = transition {
543            self.emit_event(
544                scope,
545                ConsensusEvent::ConsensusReached {
546                    proposal_id,
547                    result,
548                    timestamp: current_timestamp().unwrap_or(0),
549                },
550            );
551        }
552    }
553
554    fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
555        self.event_bus.publish(scope.clone(), event);
556    }
557}
558
559/// Wrapper around ScopeConfigBuilder that stores service and scope for convenience methods.
560pub struct ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
561where
562    Scope: ConsensusScope,
563    Storage: ConsensusStorage<Scope>,
564    Event: ConsensusEventBus<Scope>,
565    Signer: ConsensusSignatureScheme,
566{
567    service: ConsensusService<Scope, Storage, Event, Signer>,
568    scope: Scope,
569    builder: ScopeConfigBuilder,
570}
571
572impl<Scope, Storage, Event, Signer> ScopeConfigBuilderWrapper<Scope, Storage, Event, Signer>
573where
574    Scope: ConsensusScope,
575    Storage: ConsensusStorage<Scope>,
576    Event: ConsensusEventBus<Scope>,
577    Signer: ConsensusSignatureScheme,
578{
579    fn new(
580        service: ConsensusService<Scope, Storage, Event, Signer>,
581        scope: Scope,
582        builder: ScopeConfigBuilder,
583    ) -> Self {
584        Self {
585            service,
586            scope,
587            builder,
588        }
589    }
590
591    /// Set network type (P2P or Gossipsub)
592    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
593        self.builder = self.builder.with_network_type(network_type);
594        self
595    }
596
597    /// Set consensus threshold (0.0 to 1.0)
598    pub fn with_threshold(mut self, threshold: f64) -> Self {
599        self.builder = self.builder.with_threshold(threshold);
600        self
601    }
602
603    /// Set default timeout for proposals (in seconds)
604    pub fn with_timeout(mut self, timeout: Duration) -> Self {
605        self.builder = self.builder.with_timeout(timeout);
606        self
607    }
608
609    /// Set liveness criteria (how silent peers are counted)
610    pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
611        self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
612        self
613    }
614
615    /// Override max rounds (if None, uses network_type defaults)
616    pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
617        self.builder = self.builder.with_max_rounds(max_rounds);
618        self
619    }
620
621    /// Use P2P preset with common defaults
622    pub fn p2p_preset(mut self) -> Self {
623        self.builder = self.builder.p2p_preset();
624        self
625    }
626
627    /// Use Gossipsub preset with common defaults
628    pub fn gossipsub_preset(mut self) -> Self {
629        self.builder = self.builder.gossipsub_preset();
630        self
631    }
632
633    /// Use strict consensus (higher threshold = 0.9)
634    pub fn strict_consensus(mut self) -> Self {
635        self.builder = self.builder.strict_consensus();
636        self
637    }
638
639    /// Use fast consensus (lower threshold = 0.6, shorter timeout = 30s)
640    pub fn fast_consensus(mut self) -> Self {
641        self.builder = self.builder.fast_consensus();
642        self
643    }
644
645    /// Start with network-specific defaults
646    pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
647        self.builder = self.builder.with_network_defaults(network_type);
648        self
649    }
650
651    /// Initialize scope with the built configuration
652    pub async fn initialize(self) -> Result<(), ConsensusError> {
653        let config = self.builder.build()?;
654        self.service.initialize_scope(&self.scope, config).await
655    }
656
657    /// Update existing scope configuration with the built configuration
658    pub async fn update(self) -> Result<(), ConsensusError> {
659        let config = self.builder.build()?;
660        self.service
661            .update_scope_config(&self.scope, |existing| {
662                *existing = config;
663                Ok(())
664            })
665            .await
666    }
667
668    /// Get the current configuration (useful for testing)
669    pub fn get_config(&self) -> ScopeConfig {
670        self.builder.get_config()
671    }
672}