Skip to main content

hashgraph_like_consensus/
service.rs

1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5    error::ConsensusError,
6    events::{BroadcastEventBus, ConsensusEventBus},
7    protos::consensus::v1::Proposal,
8    scope::{ConsensusScope, ScopeID},
9    scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10    session::{ConsensusConfig, ConsensusSession, ConsensusState},
11    storage::{ConsensusStorage, InMemoryConsensusStorage},
12    types::{ConsensusEvent, SessionTransition},
13    utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
14};
15/// The main service that handles proposals, votes, and consensus.
16///
17/// This is the main entry point for using the consensus service.
18/// It handles creating proposals, processing votes, and managing timeouts.
19pub struct ConsensusService<Scope, S, E>
20where
21    Scope: ConsensusScope,
22    S: ConsensusStorage<Scope>,
23    E: ConsensusEventBus<Scope>,
24{
25    storage: S,
26    max_sessions_per_scope: usize,
27    event_bus: E,
28    _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33    Scope: ConsensusScope,
34    S: ConsensusStorage<Scope>,
35    E: ConsensusEventBus<Scope>,
36{
37    fn clone(&self) -> Self {
38        Self {
39            storage: self.storage.clone(),
40            max_sessions_per_scope: self.max_sessions_per_scope,
41            event_bus: self.event_bus.clone(),
42            _scope: PhantomData,
43        }
44    }
45}
46
47/// A ready-to-use service with in-memory storage and broadcast events.
48///
49/// This is the easiest way to get started. It stores everything in memory (great for
50/// testing or single-node setups) and uses a simple broadcast channel for events.
51/// If you need persistence or custom event handling, use `ConsensusService` directly.
52pub type DefaultConsensusService =
53    ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56    /// Create a service with default settings (10 max sessions per scope).
57    fn new() -> Self {
58        Self::new_with_max_sessions(10)
59    }
60
61    /// Create a service with a custom limit on how many sessions can exist per scope.
62    ///
63    /// When the limit is reached, older sessions are automatically removed to make room.
64    pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65        Self::new_with_components(
66            InMemoryConsensusStorage::new(),
67            BroadcastEventBus::default(),
68            max_sessions_per_scope,
69        )
70    }
71}
72
73impl Default for DefaultConsensusService {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81    Scope: ConsensusScope,
82    S: ConsensusStorage<Scope>,
83    E: ConsensusEventBus<Scope>,
84{
85    /// Build a service with your own storage and event bus implementations.
86    ///
87    /// Use this when you need custom persistence (like a database) or event handling.
88    /// The `max_sessions_per_scope` parameter controls how many sessions can exist per scope.
89    /// When the limit is reached, older sessions are automatically removed.
90    pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91        Self {
92            storage,
93            max_sessions_per_scope,
94            event_bus,
95            _scope: PhantomData,
96        }
97    }
98
99    /// Subscribe to events like consensus reached or consensus failed.
100    ///
101    /// Returns a receiver that you can use to listen for events across all scopes.
102    /// Events are broadcast to all subscribers, so multiple parts of your application
103    /// can react to consensus outcomes.
104    pub fn subscribe_to_events(&self) -> E::Receiver {
105        self.event_bus.subscribe()
106    }
107
108    /// Get the final consensus result for a proposal, if it's been reached.
109    ///
110    /// Returns `Ok(true)` if consensus was YES, `Ok(false)` if NO, or `Err` if
111    /// consensus hasn't been reached yet (or the proposal doesn't exist or is still active).
112    pub async fn get_consensus_result(
113        &self,
114        scope: &Scope,
115        proposal_id: u32,
116    ) -> Result<bool, ConsensusError> {
117        let session = self
118            .storage
119            .get_session(scope, proposal_id)
120            .await?
121            .ok_or(ConsensusError::SessionNotFound)?;
122
123        match session.state {
124            ConsensusState::ConsensusReached(result) => Ok(result),
125            ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126            ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127        }
128    }
129
130    /// Get all proposals that are still accepting votes.
131    pub async fn get_active_proposals(
132        &self,
133        scope: &Scope,
134    ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135        let sessions = self
136            .storage
137            .list_scope_sessions(scope)
138            .await?
139            .ok_or(ConsensusError::ScopeNotFound)?;
140        let result = sessions
141            .into_iter()
142            .filter_map(|session| session.is_active().then_some(session.proposal))
143            .collect::<Vec<Proposal>>();
144        if result.is_empty() {
145            return Ok(None);
146        }
147        Ok(Some(result))
148    }
149
150    /// Get the resolved per-proposal consensus configuration for an existing session.
151    pub async fn get_proposal_config(
152        &self,
153        scope: &Scope,
154        proposal_id: u32,
155    ) -> Result<ConsensusConfig, ConsensusError> {
156        let session = self.get_session(scope, proposal_id).await?;
157        Ok(session.config)
158    }
159
160    /// Get all proposals that have reached consensus, along with their results.
161    ///
162    /// Returns a map from proposal ID to result (`true` for YES, `false` for NO).
163    /// Only includes proposals that have finalized - active proposals are not included.
164    /// Returns `None` if no proposals have reached consensus.
165    pub async fn get_reached_proposals(
166        &self,
167        scope: &Scope,
168    ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
169        let sessions = self
170            .storage
171            .list_scope_sessions(scope)
172            .await?
173            .ok_or(ConsensusError::ScopeNotFound)?;
174
175        let result = sessions
176            .into_iter()
177            .filter_map(|session| {
178                session
179                    .get_consensus_result()
180                    .ok()
181                    .map(|result| (session.proposal.proposal_id, result))
182            })
183            .collect::<HashMap<u32, bool>>();
184        if result.is_empty() {
185            return Ok(None);
186        }
187        Ok(Some(result))
188    }
189
190    /// Check if a proposal has collected enough votes to reach consensus.
191    pub async fn has_sufficient_votes_for_proposal(
192        &self,
193        scope: &Scope,
194        proposal_id: u32,
195    ) -> Result<bool, ConsensusError> {
196        let session = self.get_session(scope, proposal_id).await?;
197        let total_votes = session.votes.len() as u32;
198        let expected_voters = session.proposal.expected_voters_count;
199        Ok(has_sufficient_votes(
200            total_votes,
201            expected_voters,
202            session.config.consensus_threshold(),
203        ))
204    }
205
206    // Scope management methods
207
208    /// Get a builder for a scope configuration.
209    ///
210    /// # Example
211    /// ```rust,no_run
212    /// use hashgraph_like_consensus::{scope_config::NetworkType, scope::ScopeID, service::DefaultConsensusService};
213    /// use std::time::Duration;
214    ///
215    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
216    ///   let service = DefaultConsensusService::default();
217    ///   let scope = ScopeID::from("my_scope");
218    ///
219    ///   // Initialize new scope
220    ///   service
221    ///     .scope(&scope)
222    ///     .await?
223    ///     .with_network_type(NetworkType::P2P)
224    ///     .with_threshold(0.75)
225    ///     .with_timeout(Duration::from_secs(120))
226    ///     .initialize()
227    ///     .await?;
228    ///
229    ///   // Update existing scope (single field)
230    ///   service
231    ///     .scope(&scope)
232    ///     .await?
233    ///     .with_threshold(0.8)
234    ///     .update()
235    ///     .await?;
236    ///   Ok(())
237    /// }
238    /// ```
239    pub async fn scope(
240        &self,
241        scope: &Scope,
242    ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
243        let existing_config = self.storage.get_scope_config(scope).await?;
244        let builder = if let Some(config) = existing_config {
245            ScopeConfigBuilder::from_existing(config)
246        } else {
247            ScopeConfigBuilder::new()
248        };
249        Ok(ScopeConfigBuilderWrapper::new(
250            self.clone(),
251            scope.clone(),
252            builder,
253        ))
254    }
255
256    async fn initialize_scope(
257        &self,
258        scope: &Scope,
259        config: ScopeConfig,
260    ) -> Result<(), ConsensusError> {
261        config.validate()?;
262        self.storage.set_scope_config(scope, config).await
263    }
264
265    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
266    where
267        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
268    {
269        self.storage.update_scope_config(scope, updater).await
270    }
271
272    /// Resolve configuration for a proposal.
273    ///
274    /// Priority: proposal override > proposal fields (expiration_timestamp, liveness_criteria_yes)
275    ///   > scope config > global default
276    pub(crate) async fn resolve_config(
277        &self,
278        scope: &Scope,
279        proposal_override: Option<ConsensusConfig>,
280        proposal: Option<&Proposal>,
281    ) -> Result<ConsensusConfig, ConsensusError> {
282        // 1. If explicit config override exists, use it as base
283        // NOTE: if a per-proposal override is provided, we should not stomp its timeout
284        // from the proposal's expiration fields (the caller explicitly chose it).
285        let has_explicit_override = proposal_override.is_some();
286        let base_config = if let Some(override_config) = proposal_override {
287            override_config
288        } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
289            ConsensusConfig::from(scope_config)
290        } else {
291            ConsensusConfig::gossipsub()
292        };
293
294        // 2. Apply proposal field overrides if proposal is provided
295        if let Some(prop) = proposal {
296            // Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
297            // unless an explicit override was supplied.
298            let timeout_seconds = if has_explicit_override {
299                base_config.consensus_timeout()
300            } else if prop.expiration_timestamp > prop.timestamp {
301                Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
302            } else {
303                base_config.consensus_timeout()
304            };
305
306            Ok(ConsensusConfig::new(
307                base_config.consensus_threshold(),
308                timeout_seconds,
309                base_config.max_rounds(),
310                base_config.use_gossipsub_rounds(),
311                prop.liveness_criteria_yes,
312            ))
313        } else {
314            Ok(base_config)
315        }
316    }
317
318    /// Handle the timeout for a proposal.
319    ///
320    /// First checks if consensus has already been reached and returns the result if so.
321    /// Otherwise, calculates consensus from current votes. If consensus is reached, marks
322    /// the session as ConsensusReached and returns the result. If no consensus, marks the
323    /// session as Failed and returns an error.
324    pub async fn handle_consensus_timeout(
325        &self,
326        scope: &Scope,
327        proposal_id: u32,
328    ) -> Result<bool, ConsensusError> {
329        let timeout_result: Result<Option<bool>, ConsensusError> = self
330            .update_session(scope, proposal_id, |session| {
331                if let ConsensusState::ConsensusReached(result) = session.state {
332                    return Ok(Some(result));
333                }
334
335                // Try to calculate consensus result first - if we have enough votes, return the result
336                // even if the proposal has technically expired
337                let result = calculate_consensus_result(
338                    &session.votes,
339                    session.proposal.expected_voters_count,
340                    session.config.consensus_threshold(),
341                    session.proposal.liveness_criteria_yes,
342                    true,
343                );
344
345                if let Some(result) = result {
346                    session.state = ConsensusState::ConsensusReached(result);
347                    Ok(Some(result))
348                } else {
349                    session.state = ConsensusState::Failed;
350                    Ok(None)
351                }
352            })
353            .await;
354
355        match timeout_result? {
356            Some(consensus_result) => {
357                self.emit_event(
358                    scope,
359                    ConsensusEvent::ConsensusReached {
360                        proposal_id,
361                        result: consensus_result,
362                        timestamp: current_timestamp()?,
363                    },
364                );
365                Ok(consensus_result)
366            }
367            None => {
368                self.emit_event(
369                    scope,
370                    ConsensusEvent::ConsensusFailed {
371                        proposal_id,
372                        timestamp: current_timestamp()?,
373                    },
374                );
375                Err(ConsensusError::InsufficientVotesAtTimeout)
376            }
377        }
378    }
379
380    pub(crate) async fn get_session(
381        &self,
382        scope: &Scope,
383        proposal_id: u32,
384    ) -> Result<ConsensusSession, ConsensusError> {
385        self.storage
386            .get_session(scope, proposal_id)
387            .await?
388            .ok_or(ConsensusError::SessionNotFound)
389    }
390
391    pub(crate) async fn update_session<R, F>(
392        &self,
393        scope: &Scope,
394        proposal_id: u32,
395        mutator: F,
396    ) -> Result<R, ConsensusError>
397    where
398        R: Send,
399        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
400    {
401        self.storage
402            .update_session(scope, proposal_id, mutator)
403            .await
404    }
405
406    pub(crate) async fn save_session(
407        &self,
408        scope: &Scope,
409        session: ConsensusSession,
410    ) -> Result<(), ConsensusError> {
411        self.storage.save_session(scope, session).await
412    }
413
414    pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
415        self.storage
416            .update_scope_sessions(scope, |sessions| {
417                if sessions.len() <= self.max_sessions_per_scope {
418                    return Ok(());
419                }
420
421                sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
422                sessions.truncate(self.max_sessions_per_scope);
423                Ok(())
424            })
425            .await
426    }
427
428    pub(crate) async fn list_scope_sessions(
429        &self,
430        scope: &Scope,
431    ) -> Result<Vec<ConsensusSession>, ConsensusError> {
432        self.storage
433            .list_scope_sessions(scope)
434            .await?
435            .ok_or(ConsensusError::ScopeNotFound)
436    }
437
438    pub(crate) fn handle_transition(
439        &self,
440        scope: &Scope,
441        proposal_id: u32,
442        transition: SessionTransition,
443    ) {
444        if let SessionTransition::ConsensusReached(result) = transition {
445            self.emit_event(
446                scope,
447                ConsensusEvent::ConsensusReached {
448                    proposal_id,
449                    result,
450                    timestamp: current_timestamp().unwrap_or(0),
451                },
452            );
453        }
454    }
455
456    fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
457        self.event_bus.publish(scope.clone(), event);
458    }
459}
460
461/// Wrapper around ScopeConfigBuilder that stores service and scope for convenience methods.
462pub struct ScopeConfigBuilderWrapper<Scope, S, E>
463where
464    Scope: ConsensusScope,
465    S: ConsensusStorage<Scope>,
466    E: ConsensusEventBus<Scope>,
467{
468    service: ConsensusService<Scope, S, E>,
469    scope: Scope,
470    builder: ScopeConfigBuilder,
471}
472
473impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
474where
475    Scope: ConsensusScope,
476    S: ConsensusStorage<Scope>,
477    E: ConsensusEventBus<Scope>,
478{
479    fn new(
480        service: ConsensusService<Scope, S, E>,
481        scope: Scope,
482        builder: ScopeConfigBuilder,
483    ) -> Self {
484        Self {
485            service,
486            scope,
487            builder,
488        }
489    }
490
491    /// Set network type (P2P or Gossipsub)
492    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
493        self.builder = self.builder.with_network_type(network_type);
494        self
495    }
496
497    /// Set consensus threshold (0.0 to 1.0)
498    pub fn with_threshold(mut self, threshold: f64) -> Self {
499        self.builder = self.builder.with_threshold(threshold);
500        self
501    }
502
503    /// Set default timeout for proposals (in seconds)
504    pub fn with_timeout(mut self, timeout: Duration) -> Self {
505        self.builder = self.builder.with_timeout(timeout);
506        self
507    }
508
509    /// Set liveness criteria (how silent peers are counted)
510    pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
511        self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
512        self
513    }
514
515    /// Override max rounds (if None, uses network_type defaults)
516    pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
517        self.builder = self.builder.with_max_rounds(max_rounds);
518        self
519    }
520
521    /// Use P2P preset with common defaults
522    pub fn p2p_preset(mut self) -> Self {
523        self.builder = self.builder.p2p_preset();
524        self
525    }
526
527    /// Use Gossipsub preset with common defaults
528    pub fn gossipsub_preset(mut self) -> Self {
529        self.builder = self.builder.gossipsub_preset();
530        self
531    }
532
533    /// Use strict consensus (higher threshold = 0.9)
534    pub fn strict_consensus(mut self) -> Self {
535        self.builder = self.builder.strict_consensus();
536        self
537    }
538
539    /// Use fast consensus (lower threshold = 0.6, shorter timeout = 30s)
540    pub fn fast_consensus(mut self) -> Self {
541        self.builder = self.builder.fast_consensus();
542        self
543    }
544
545    /// Start with network-specific defaults
546    pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
547        self.builder = self.builder.with_network_defaults(network_type);
548        self
549    }
550
551    /// Initialize scope with the built configuration
552    pub async fn initialize(self) -> Result<(), ConsensusError> {
553        let config = self.builder.build()?;
554        self.service.initialize_scope(&self.scope, config).await
555    }
556
557    /// Update existing scope configuration with the built configuration
558    pub async fn update(self) -> Result<(), ConsensusError> {
559        let config = self.builder.build()?;
560        self.service
561            .update_scope_config(&self.scope, |existing| {
562                *existing = config;
563                Ok(())
564            })
565            .await
566    }
567
568    /// Get the current configuration (useful for testing)
569    pub fn get_config(&self) -> ScopeConfig {
570        self.builder.get_config()
571    }
572}