hashgraph_like_consensus/
service.rs

1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5    error::ConsensusError,
6    events::{BroadcastEventBus, ConsensusEventBus},
7    protos::consensus::v1::Proposal,
8    scope::{ConsensusScope, ScopeID},
9    scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10    session::{ConsensusConfig, ConsensusSession, ConsensusState},
11    storage::{ConsensusStorage, InMemoryConsensusStorage},
12    types::{ConsensusEvent, SessionTransition},
13    utils::{calculate_consensus_result, has_sufficient_votes},
14};
15/// The main service that handles proposals, votes, and consensus.
16///
17/// This is the main entry point for using the consensus service.
18/// It handles creating proposals, processing votes, and managing timeouts.
19pub struct ConsensusService<Scope, S, E>
20where
21    Scope: ConsensusScope,
22    S: ConsensusStorage<Scope>,
23    E: ConsensusEventBus<Scope>,
24{
25    storage: S,
26    max_sessions_per_scope: usize,
27    event_bus: E,
28    _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33    Scope: ConsensusScope,
34    S: ConsensusStorage<Scope>,
35    E: ConsensusEventBus<Scope>,
36{
37    fn clone(&self) -> Self {
38        Self {
39            storage: self.storage.clone(),
40            max_sessions_per_scope: self.max_sessions_per_scope,
41            event_bus: self.event_bus.clone(),
42            _scope: PhantomData,
43        }
44    }
45}
46
47/// A ready-to-use service with in-memory storage and broadcast events.
48///
49/// This is the easiest way to get started. It stores everything in memory (great for
50/// testing or single-node setups) and uses a simple broadcast channel for events.
51/// If you need persistence or custom event handling, use `ConsensusService` directly.
52pub type DefaultConsensusService =
53    ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56    /// Create a service with default settings (10 max sessions per scope).
57    fn new() -> Self {
58        Self::new_with_max_sessions(10)
59    }
60
61    /// Create a service with a custom limit on how many sessions can exist per scope.
62    ///
63    /// When the limit is reached, older sessions are automatically removed to make room.
64    pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65        Self::new_with_components(
66            InMemoryConsensusStorage::new(),
67            BroadcastEventBus::default(),
68            max_sessions_per_scope,
69        )
70    }
71}
72
73impl Default for DefaultConsensusService {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81    Scope: ConsensusScope,
82    S: ConsensusStorage<Scope>,
83    E: ConsensusEventBus<Scope>,
84{
85    /// Build a service with your own storage and event bus implementations.
86    ///
87    /// Use this when you need custom persistence (like a database) or event handling.
88    /// The `max_sessions_per_scope` parameter controls how many sessions can exist per scope.
89    /// When the limit is reached, older sessions are automatically removed.
90    pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91        Self {
92            storage,
93            max_sessions_per_scope,
94            event_bus,
95            _scope: PhantomData,
96        }
97    }
98
99    /// Subscribe to events like consensus reached or consensus failed.
100    ///
101    /// Returns a receiver that you can use to listen for events across all scopes.
102    /// Events are broadcast to all subscribers, so multiple parts of your application
103    /// can react to consensus outcomes.
104    pub fn subscribe_to_events(&self) -> E::Receiver {
105        self.event_bus.subscribe()
106    }
107
108    /// Get the final consensus result for a proposal, if it's been reached.
109    ///
110    /// Returns `Ok(true)` if consensus was YES, `Ok(false)` if NO, or `Err` if
111    /// consensus hasn't been reached yet (or the proposal doesn't exist or is still active).
112    pub async fn get_consensus_result(
113        &self,
114        scope: &Scope,
115        proposal_id: u32,
116    ) -> Result<bool, ConsensusError> {
117        let session = self
118            .storage
119            .get_session(scope, proposal_id)
120            .await?
121            .ok_or(ConsensusError::SessionNotFound)?;
122
123        match session.state {
124            ConsensusState::ConsensusReached(result) => Ok(result),
125            ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126            ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127        }
128    }
129
130    /// Get all proposals that are still accepting votes.
131    pub async fn get_active_proposals(
132        &self,
133        scope: &Scope,
134    ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135        let sessions = self
136            .storage
137            .list_scope_sessions(scope)
138            .await?
139            .ok_or(ConsensusError::ScopeNotFound)?;
140        let result = sessions
141            .into_iter()
142            .filter_map(|session| session.is_active().then_some(session.proposal))
143            .collect::<Vec<Proposal>>();
144        if result.is_empty() {
145            return Ok(None);
146        }
147        Ok(Some(result))
148    }
149
150    /// Get all proposals that have reached consensus, along with their results.
151    ///
152    /// Returns a map from proposal ID to result (`true` for YES, `false` for NO).
153    /// Only includes proposals that have finalized - active proposals are not included.
154    /// Returns `None` if no proposals have reached consensus.
155    pub async fn get_reached_proposals(
156        &self,
157        scope: &Scope,
158    ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
159        let sessions = self
160            .storage
161            .list_scope_sessions(scope)
162            .await?
163            .ok_or(ConsensusError::ScopeNotFound)?;
164
165        let result = sessions
166            .into_iter()
167            .filter_map(|session| {
168                session
169                    .get_consensus_result()
170                    .ok()
171                    .map(|result| (session.proposal.proposal_id, result))
172            })
173            .collect::<HashMap<u32, bool>>();
174        if result.is_empty() {
175            return Ok(None);
176        }
177        Ok(Some(result))
178    }
179
180    /// Check if a proposal has collected enough votes to reach consensus.
181    pub async fn has_sufficient_votes_for_proposal(
182        &self,
183        scope: &Scope,
184        proposal_id: u32,
185    ) -> Result<bool, ConsensusError> {
186        let session = self.get_session(scope, proposal_id).await?;
187        let total_votes = session.votes.len() as u32;
188        let expected_voters = session.proposal.expected_voters_count;
189        Ok(has_sufficient_votes(
190            total_votes,
191            expected_voters,
192            session.config.consensus_threshold(),
193        ))
194    }
195
196    // Scope management methods
197
198    /// Get a builder for a scope configuration.
199    ///
200    /// # Example
201    /// ```rust,no_run
202    /// use hashgraph_like_consensus::{scope_config::NetworkType, scope::ScopeID, service::DefaultConsensusService};
203    /// use std::time::Duration;
204    ///
205    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
206    ///   let service = DefaultConsensusService::default();
207    ///   let scope = ScopeID::from("my_scope");
208    ///
209    ///   // Initialize new scope
210    ///   service
211    ///     .scope(&scope)
212    ///     .await?
213    ///     .with_network_type(NetworkType::P2P)
214    ///     .with_threshold(0.75)
215    ///     .with_timeout(Duration::from_secs(120))
216    ///     .initialize()
217    ///     .await?;
218    ///
219    ///   // Update existing scope (single field)
220    ///   service
221    ///     .scope(&scope)
222    ///     .await?
223    ///     .with_threshold(0.8)
224    ///     .update()
225    ///     .await?;
226    ///   Ok(())
227    /// }
228    /// ```
229    pub async fn scope(
230        &self,
231        scope: &Scope,
232    ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
233        let existing_config = self.storage.get_scope_config(scope).await?;
234        let builder = if let Some(config) = existing_config {
235            ScopeConfigBuilder::from_existing(config)
236        } else {
237            ScopeConfigBuilder::new()
238        };
239        Ok(ScopeConfigBuilderWrapper::new(
240            self.clone(),
241            scope.clone(),
242            builder,
243        ))
244    }
245
246    async fn initialize_scope(
247        &self,
248        scope: &Scope,
249        config: ScopeConfig,
250    ) -> Result<(), ConsensusError> {
251        config.validate()?;
252        self.storage.set_scope_config(scope, config).await
253    }
254
255    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
256    where
257        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
258    {
259        self.storage.update_scope_config(scope, updater).await
260    }
261
262    /// Resolve configuration for a proposal.
263    ///
264    /// Priority: proposal override > proposal fields (expiration_timestamp, liveness_criteria_yes)
265    ///   > scope config > global default
266    pub(crate) async fn resolve_config(
267        &self,
268        scope: &Scope,
269        proposal_override: Option<ConsensusConfig>,
270        proposal: Option<&Proposal>,
271    ) -> Result<ConsensusConfig, ConsensusError> {
272        // 1. If explicit config override exists, use it as base
273        let base_config = if let Some(override_config) = proposal_override {
274            override_config
275        } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
276            ConsensusConfig::from(scope_config)
277        } else {
278            ConsensusConfig::gossipsub()
279        };
280
281        // 2. Apply proposal field overrides if proposal is provided
282        if let Some(prop) = proposal {
283            // Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time)
284            let timeout_seconds = if prop.expiration_timestamp > prop.timestamp {
285                Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
286            } else {
287                base_config.consensus_timeout()
288            };
289
290            Ok(ConsensusConfig::new(
291                base_config.consensus_threshold(),
292                timeout_seconds,
293                base_config.max_rounds(),
294                base_config.use_gossipsub_rounds(),
295                prop.liveness_criteria_yes,
296            ))
297        } else {
298            Ok(base_config)
299        }
300    }
301
302    /// Handle the timeout for a proposal.
303    ///
304    /// First checks if consensus has already been reached and returns the result if so.
305    /// Otherwise, calculates consensus from current votes. If consensus is reached, marks
306    /// the session as ConsensusReached and returns the result. If no consensus, marks the
307    /// session as Failed and returns an error.
308    pub async fn handle_consensus_timeout(
309        &self,
310        scope: &Scope,
311        proposal_id: u32,
312    ) -> Result<bool, ConsensusError> {
313        let timeout_result: Result<Option<bool>, ConsensusError> = self
314            .update_session(scope, proposal_id, |session| {
315                if let ConsensusState::ConsensusReached(result) = session.state {
316                    return Ok(Some(result));
317                }
318
319                // Try to calculate consensus result first - if we have enough votes, return the result
320                // even if the proposal has technically expired
321                let result = calculate_consensus_result(
322                    &session.votes,
323                    session.proposal.expected_voters_count,
324                    session.config.consensus_threshold(),
325                    session.proposal.liveness_criteria_yes,
326                );
327
328                if let Some(result) = result {
329                    session.state = ConsensusState::ConsensusReached(result);
330                    Ok(Some(result))
331                } else {
332                    session.state = ConsensusState::Failed;
333                    Ok(None)
334                }
335            })
336            .await;
337
338        match timeout_result? {
339            Some(consensus_result) => {
340                self.emit_event(
341                    scope,
342                    ConsensusEvent::ConsensusReached {
343                        proposal_id,
344                        result: consensus_result,
345                    },
346                );
347                Ok(consensus_result)
348            }
349            None => {
350                self.emit_event(scope, ConsensusEvent::ConsensusFailed { proposal_id });
351                Err(ConsensusError::InsufficientVotesAtTimeout)
352            }
353        }
354    }
355
356    pub(crate) async fn get_session(
357        &self,
358        scope: &Scope,
359        proposal_id: u32,
360    ) -> Result<ConsensusSession, ConsensusError> {
361        self.storage
362            .get_session(scope, proposal_id)
363            .await?
364            .ok_or(ConsensusError::SessionNotFound)
365    }
366
367    pub(crate) async fn update_session<R, F>(
368        &self,
369        scope: &Scope,
370        proposal_id: u32,
371        mutator: F,
372    ) -> Result<R, ConsensusError>
373    where
374        R: Send,
375        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
376    {
377        self.storage
378            .update_session(scope, proposal_id, mutator)
379            .await
380    }
381
382    pub(crate) async fn save_session(
383        &self,
384        scope: &Scope,
385        session: ConsensusSession,
386    ) -> Result<(), ConsensusError> {
387        self.storage.save_session(scope, session).await
388    }
389
390    pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
391        self.storage
392            .update_scope_sessions(scope, |sessions| {
393                if sessions.len() <= self.max_sessions_per_scope {
394                    return Ok(());
395                }
396
397                sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
398                sessions.truncate(self.max_sessions_per_scope);
399                Ok(())
400            })
401            .await
402    }
403
404    pub(crate) async fn list_scope_sessions(
405        &self,
406        scope: &Scope,
407    ) -> Result<Vec<ConsensusSession>, ConsensusError> {
408        self.storage
409            .list_scope_sessions(scope)
410            .await?
411            .ok_or(ConsensusError::ScopeNotFound)
412    }
413
414    pub(crate) fn handle_transition(
415        &self,
416        scope: &Scope,
417        proposal_id: u32,
418        transition: SessionTransition,
419    ) {
420        if let SessionTransition::ConsensusReached(result) = transition {
421            self.emit_event(
422                scope,
423                ConsensusEvent::ConsensusReached {
424                    proposal_id,
425                    result,
426                },
427            );
428        }
429    }
430
431    fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
432        self.event_bus.publish(scope.clone(), event);
433    }
434}
435
436/// Wrapper around ScopeConfigBuilder that stores service and scope for convenience methods.
437pub struct ScopeConfigBuilderWrapper<Scope, S, E>
438where
439    Scope: ConsensusScope,
440    S: ConsensusStorage<Scope>,
441    E: ConsensusEventBus<Scope>,
442{
443    service: ConsensusService<Scope, S, E>,
444    scope: Scope,
445    builder: ScopeConfigBuilder,
446}
447
448impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
449where
450    Scope: ConsensusScope,
451    S: ConsensusStorage<Scope>,
452    E: ConsensusEventBus<Scope>,
453{
454    fn new(
455        service: ConsensusService<Scope, S, E>,
456        scope: Scope,
457        builder: ScopeConfigBuilder,
458    ) -> Self {
459        Self {
460            service,
461            scope,
462            builder,
463        }
464    }
465
466    /// Set network type (P2P or Gossipsub)
467    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
468        self.builder = self.builder.with_network_type(network_type);
469        self
470    }
471
472    /// Set consensus threshold (0.0 to 1.0)
473    pub fn with_threshold(mut self, threshold: f64) -> Self {
474        self.builder = self.builder.with_threshold(threshold);
475        self
476    }
477
478    /// Set default timeout for proposals (in seconds)
479    pub fn with_timeout(mut self, timeout: Duration) -> Self {
480        self.builder = self.builder.with_timeout(timeout);
481        self
482    }
483
484    /// Set liveness criteria (how silent peers are counted)
485    pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
486        self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
487        self
488    }
489
490    /// Override max rounds (if None, uses network_type defaults)
491    pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
492        self.builder = self.builder.with_max_rounds(max_rounds);
493        self
494    }
495
496    /// Use P2P preset with common defaults
497    pub fn p2p_preset(mut self) -> Self {
498        self.builder = self.builder.p2p_preset();
499        self
500    }
501
502    /// Use Gossipsub preset with common defaults
503    pub fn gossipsub_preset(mut self) -> Self {
504        self.builder = self.builder.gossipsub_preset();
505        self
506    }
507
508    /// Use strict consensus (higher threshold = 0.9)
509    pub fn strict_consensus(mut self) -> Self {
510        self.builder = self.builder.strict_consensus();
511        self
512    }
513
514    /// Use fast consensus (lower threshold = 0.6, shorter timeout = 30s)
515    pub fn fast_consensus(mut self) -> Self {
516        self.builder = self.builder.fast_consensus();
517        self
518    }
519
520    /// Start with network-specific defaults
521    pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
522        self.builder = self.builder.with_network_defaults(network_type);
523        self
524    }
525
526    /// Initialize scope with the built configuration
527    pub async fn initialize(self) -> Result<(), ConsensusError> {
528        let config = self.builder.build()?;
529        self.service.initialize_scope(&self.scope, config).await
530    }
531
532    /// Update existing scope configuration with the built configuration
533    pub async fn update(self) -> Result<(), ConsensusError> {
534        let config = self.builder.build()?;
535        self.service
536            .update_scope_config(&self.scope, |existing| {
537                *existing = config;
538                Ok(())
539            })
540            .await
541    }
542
543    /// Get the current configuration (useful for testing)
544    pub fn get_config(&self) -> ScopeConfig {
545        self.builder.get_config()
546    }
547}