Skip to main content

hashgraph_like_consensus/
service.rs

1use std::{collections::HashMap, marker::PhantomData};
2use tokio::time::Duration;
3
4use crate::{
5    error::ConsensusError,
6    events::{BroadcastEventBus, ConsensusEventBus},
7    protos::consensus::v1::Proposal,
8    scope::{ConsensusScope, ScopeID},
9    scope_config::{NetworkType, ScopeConfig, ScopeConfigBuilder},
10    session::{ConsensusConfig, ConsensusSession, ConsensusState},
11    storage::{ConsensusStorage, InMemoryConsensusStorage},
12    types::{ConsensusEvent, SessionTransition},
13    utils::{calculate_consensus_result, current_timestamp, has_sufficient_votes},
14};
15/// The main service that handles proposals, votes, and consensus.
16///
17/// This is the main entry point for using the consensus service.
18/// It handles creating proposals, processing votes, and managing timeouts.
19pub struct ConsensusService<Scope, S, E>
20where
21    Scope: ConsensusScope,
22    S: ConsensusStorage<Scope>,
23    E: ConsensusEventBus<Scope>,
24{
25    storage: S,
26    max_sessions_per_scope: usize,
27    event_bus: E,
28    _scope: PhantomData<Scope>,
29}
30
31impl<Scope, S, E> Clone for ConsensusService<Scope, S, E>
32where
33    Scope: ConsensusScope,
34    S: ConsensusStorage<Scope>,
35    E: ConsensusEventBus<Scope>,
36{
37    fn clone(&self) -> Self {
38        Self {
39            storage: self.storage.clone(),
40            max_sessions_per_scope: self.max_sessions_per_scope,
41            event_bus: self.event_bus.clone(),
42            _scope: PhantomData,
43        }
44    }
45}
46
47/// A ready-to-use service with in-memory storage and broadcast events.
48///
49/// This is the easiest way to get started. It stores everything in memory (great for
50/// testing or single-node setups) and uses a simple broadcast channel for events.
51/// If you need persistence or custom event handling, use `ConsensusService` directly.
52pub type DefaultConsensusService =
53    ConsensusService<ScopeID, InMemoryConsensusStorage<ScopeID>, BroadcastEventBus<ScopeID>>;
54
55impl DefaultConsensusService {
56    /// Create a service with default settings (10 max sessions per scope).
57    fn new() -> Self {
58        Self::new_with_max_sessions(10)
59    }
60
61    /// Create a service with a custom limit on how many sessions can exist per scope.
62    ///
63    /// When the limit is reached, older sessions are automatically removed to make room.
64    pub fn new_with_max_sessions(max_sessions_per_scope: usize) -> Self {
65        Self::new_with_components(
66            InMemoryConsensusStorage::new(),
67            BroadcastEventBus::default(),
68            max_sessions_per_scope,
69        )
70    }
71}
72
73impl Default for DefaultConsensusService {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl<Scope, S, E> ConsensusService<Scope, S, E>
80where
81    Scope: ConsensusScope,
82    S: ConsensusStorage<Scope>,
83    E: ConsensusEventBus<Scope>,
84{
85    /// Build a service with your own storage and event bus implementations.
86    ///
87    /// Use this when you need custom persistence (like a database) or event handling.
88    /// The `max_sessions_per_scope` parameter controls how many sessions can exist per scope.
89    /// When the limit is reached, older sessions are automatically removed.
90    pub fn new_with_components(storage: S, event_bus: E, max_sessions_per_scope: usize) -> Self {
91        Self {
92            storage,
93            max_sessions_per_scope,
94            event_bus,
95            _scope: PhantomData,
96        }
97    }
98
99    /// Subscribe to events like consensus reached or consensus failed.
100    ///
101    /// Returns a receiver that you can use to listen for events across all scopes.
102    /// Events are broadcast to all subscribers, so multiple parts of your application
103    /// can react to consensus outcomes.
104    pub fn subscribe_to_events(&self) -> E::Receiver {
105        self.event_bus.subscribe()
106    }
107
108    /// Get the final consensus result for a proposal, if it's been reached.
109    ///
110    /// Returns `Ok(true)` if consensus was YES, `Ok(false)` if NO, or `Err` if
111    /// consensus hasn't been reached yet (or the proposal doesn't exist or is still active).
112    pub async fn get_consensus_result(
113        &self,
114        scope: &Scope,
115        proposal_id: u32,
116    ) -> Result<bool, ConsensusError> {
117        let session = self
118            .storage
119            .get_session(scope, proposal_id)
120            .await?
121            .ok_or(ConsensusError::SessionNotFound)?;
122
123        match session.state {
124            ConsensusState::ConsensusReached(result) => Ok(result),
125            ConsensusState::Failed => Err(ConsensusError::ConsensusFailed),
126            ConsensusState::Active => Err(ConsensusError::ConsensusNotReached),
127        }
128    }
129
130    /// Get all proposals that are still accepting votes.
131    pub async fn get_active_proposals(
132        &self,
133        scope: &Scope,
134    ) -> Result<Option<Vec<Proposal>>, ConsensusError> {
135        let sessions = self
136            .storage
137            .list_scope_sessions(scope)
138            .await?
139            .ok_or(ConsensusError::ScopeNotFound)?;
140        let result = sessions
141            .into_iter()
142            .filter_map(|session| session.is_active().then_some(session.proposal))
143            .collect::<Vec<Proposal>>();
144        if result.is_empty() {
145            return Ok(None);
146        }
147        Ok(Some(result))
148    }
149
150    /// Get the resolved per-proposal consensus configuration for an existing session.
151    pub async fn get_proposal_config(
152        &self,
153        scope: &Scope,
154        proposal_id: u32,
155    ) -> Result<ConsensusConfig, ConsensusError> {
156        let session = self.get_session(scope, proposal_id).await?;
157        Ok(session.config)
158    }
159
160    /// Get all proposals that have reached consensus, along with their results.
161    ///
162    /// Returns a map from proposal ID to result (`true` for YES, `false` for NO).
163    /// Only includes proposals that have finalized - active proposals are not included.
164    /// Returns `None` if no proposals have reached consensus.
165    pub async fn get_reached_proposals(
166        &self,
167        scope: &Scope,
168    ) -> Result<Option<HashMap<u32, bool>>, ConsensusError> {
169        let sessions = self
170            .storage
171            .list_scope_sessions(scope)
172            .await?
173            .ok_or(ConsensusError::ScopeNotFound)?;
174
175        let result = sessions
176            .into_iter()
177            .filter_map(|session| {
178                session
179                    .get_consensus_result()
180                    .ok()
181                    .map(|result| (session.proposal.proposal_id, result))
182            })
183            .collect::<HashMap<u32, bool>>();
184        if result.is_empty() {
185            return Ok(None);
186        }
187        Ok(Some(result))
188    }
189
190    /// Check if a proposal has collected enough votes to reach consensus.
191    pub async fn has_sufficient_votes_for_proposal(
192        &self,
193        scope: &Scope,
194        proposal_id: u32,
195    ) -> Result<bool, ConsensusError> {
196        let session = self.get_session(scope, proposal_id).await?;
197        let total_votes = session.votes.len() as u32;
198        let expected_voters = session.proposal.expected_voters_count;
199        Ok(has_sufficient_votes(
200            total_votes,
201            expected_voters,
202            session.config.consensus_threshold(),
203        ))
204    }
205
206    // Scope management methods
207
208    /// Get a builder for a scope configuration.
209    ///
210    /// # Example
211    /// ```rust,no_run
212    /// use hashgraph_like_consensus::{scope_config::NetworkType, scope::ScopeID, service::DefaultConsensusService};
213    /// use std::time::Duration;
214    ///
215    /// async fn example() -> Result<(), Box<dyn std::error::Error>> {
216    ///   let service = DefaultConsensusService::default();
217    ///   let scope = ScopeID::from("my_scope");
218    ///
219    ///   // Initialize new scope
220    ///   service
221    ///     .scope(&scope)
222    ///     .await?
223    ///     .with_network_type(NetworkType::P2P)
224    ///     .with_threshold(0.75)
225    ///     .with_timeout(Duration::from_secs(120))
226    ///     .initialize()
227    ///     .await?;
228    ///
229    ///   // Update existing scope (single field)
230    ///   service
231    ///     .scope(&scope)
232    ///     .await?
233    ///     .with_threshold(0.8)
234    ///     .update()
235    ///     .await?;
236    ///   Ok(())
237    /// }
238    /// ```
239    pub async fn scope(
240        &self,
241        scope: &Scope,
242    ) -> Result<ScopeConfigBuilderWrapper<Scope, S, E>, ConsensusError> {
243        let existing_config = self.storage.get_scope_config(scope).await?;
244        let builder = if let Some(config) = existing_config {
245            ScopeConfigBuilder::from_existing(config)
246        } else {
247            ScopeConfigBuilder::new()
248        };
249        Ok(ScopeConfigBuilderWrapper::new(
250            self.clone(),
251            scope.clone(),
252            builder,
253        ))
254    }
255
256    async fn initialize_scope(
257        &self,
258        scope: &Scope,
259        config: ScopeConfig,
260    ) -> Result<(), ConsensusError> {
261        config.validate()?;
262        self.storage.set_scope_config(scope, config).await
263    }
264
265    async fn update_scope_config<F>(&self, scope: &Scope, updater: F) -> Result<(), ConsensusError>
266    where
267        F: FnOnce(&mut ScopeConfig) -> Result<(), ConsensusError> + Send,
268    {
269        self.storage.update_scope_config(scope, updater).await
270    }
271
272    /// Resolve configuration for a proposal.
273    ///
274    /// Priority: proposal override > proposal fields (expiration_timestamp, liveness_criteria_yes)
275    ///   > scope config > global default
276    pub(crate) async fn resolve_config(
277        &self,
278        scope: &Scope,
279        proposal_override: Option<ConsensusConfig>,
280        proposal: Option<&Proposal>,
281    ) -> Result<ConsensusConfig, ConsensusError> {
282        // 1. If explicit config override exists, use it as base
283        // NOTE: if a per-proposal override is provided, we should not stomp its timeout
284        // from the proposal's expiration fields (the caller explicitly chose it).
285        let has_explicit_override = proposal_override.is_some();
286        let base_config = if let Some(override_config) = proposal_override {
287            override_config
288        } else if let Some(scope_config) = self.storage.get_scope_config(scope).await? {
289            ConsensusConfig::from(scope_config)
290        } else {
291            ConsensusConfig::gossipsub()
292        };
293
294        // 2. Apply proposal field overrides if proposal is provided
295        if let Some(prop) = proposal {
296            // Calculate timeout from expiration_timestamp (absolute timestamp) - timestamp (creation time),
297            // unless an explicit override was supplied.
298            let timeout_seconds = if has_explicit_override {
299                base_config.consensus_timeout()
300            } else if prop.expiration_timestamp > prop.timestamp {
301                Duration::from_secs(prop.expiration_timestamp - prop.timestamp)
302            } else {
303                base_config.consensus_timeout()
304            };
305
306            Ok(ConsensusConfig::new(
307                base_config.consensus_threshold(),
308                timeout_seconds,
309                base_config.max_rounds(),
310                base_config.use_gossipsub_rounds(),
311                prop.liveness_criteria_yes,
312            ))
313        } else {
314            Ok(base_config)
315        }
316    }
317
318    /// Handle the timeout for a proposal.
319    ///
320    /// First checks if consensus has already been reached and returns the result if so.
321    /// Otherwise, calculates consensus from current votes. If consensus is reached, marks
322    /// the session as ConsensusReached and returns the result. If no consensus, marks the
323    /// session as Failed and returns an error.
324    pub async fn handle_consensus_timeout(
325        &self,
326        scope: &Scope,
327        proposal_id: u32,
328    ) -> Result<bool, ConsensusError> {
329        let timeout_result: Result<Option<bool>, ConsensusError> = self
330            .update_session(scope, proposal_id, |session| {
331                if let ConsensusState::ConsensusReached(result) = session.state {
332                    return Ok(Some(result));
333                }
334
335                // Try to calculate consensus result first - if we have enough votes, return the result
336                // even if the proposal has technically expired
337                let result = calculate_consensus_result(
338                    &session.votes,
339                    session.proposal.expected_voters_count,
340                    session.config.consensus_threshold(),
341                    session.proposal.liveness_criteria_yes,
342                );
343
344                if let Some(result) = result {
345                    session.state = ConsensusState::ConsensusReached(result);
346                    Ok(Some(result))
347                } else {
348                    session.state = ConsensusState::Failed;
349                    Ok(None)
350                }
351            })
352            .await;
353
354        match timeout_result? {
355            Some(consensus_result) => {
356                self.emit_event(
357                    scope,
358                    ConsensusEvent::ConsensusReached {
359                        proposal_id,
360                        result: consensus_result,
361                        timestamp: current_timestamp()?,
362                    },
363                );
364                Ok(consensus_result)
365            }
366            None => {
367                self.emit_event(
368                    scope,
369                    ConsensusEvent::ConsensusFailed {
370                        proposal_id,
371                        timestamp: current_timestamp()?,
372                    },
373                );
374                Err(ConsensusError::InsufficientVotesAtTimeout)
375            }
376        }
377    }
378
379    pub(crate) async fn get_session(
380        &self,
381        scope: &Scope,
382        proposal_id: u32,
383    ) -> Result<ConsensusSession, ConsensusError> {
384        self.storage
385            .get_session(scope, proposal_id)
386            .await?
387            .ok_or(ConsensusError::SessionNotFound)
388    }
389
390    pub(crate) async fn update_session<R, F>(
391        &self,
392        scope: &Scope,
393        proposal_id: u32,
394        mutator: F,
395    ) -> Result<R, ConsensusError>
396    where
397        R: Send,
398        F: FnOnce(&mut ConsensusSession) -> Result<R, ConsensusError> + Send,
399    {
400        self.storage
401            .update_session(scope, proposal_id, mutator)
402            .await
403    }
404
405    pub(crate) async fn save_session(
406        &self,
407        scope: &Scope,
408        session: ConsensusSession,
409    ) -> Result<(), ConsensusError> {
410        self.storage.save_session(scope, session).await
411    }
412
413    pub(crate) async fn trim_scope_sessions(&self, scope: &Scope) -> Result<(), ConsensusError> {
414        self.storage
415            .update_scope_sessions(scope, |sessions| {
416                if sessions.len() <= self.max_sessions_per_scope {
417                    return Ok(());
418                }
419
420                sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
421                sessions.truncate(self.max_sessions_per_scope);
422                Ok(())
423            })
424            .await
425    }
426
427    pub(crate) async fn list_scope_sessions(
428        &self,
429        scope: &Scope,
430    ) -> Result<Vec<ConsensusSession>, ConsensusError> {
431        self.storage
432            .list_scope_sessions(scope)
433            .await?
434            .ok_or(ConsensusError::ScopeNotFound)
435    }
436
437    pub(crate) fn handle_transition(
438        &self,
439        scope: &Scope,
440        proposal_id: u32,
441        transition: SessionTransition,
442    ) {
443        if let SessionTransition::ConsensusReached(result) = transition {
444            self.emit_event(
445                scope,
446                ConsensusEvent::ConsensusReached {
447                    proposal_id,
448                    result,
449                    timestamp: current_timestamp().unwrap_or(0),
450                },
451            );
452        }
453    }
454
455    fn emit_event(&self, scope: &Scope, event: ConsensusEvent) {
456        self.event_bus.publish(scope.clone(), event);
457    }
458}
459
460/// Wrapper around ScopeConfigBuilder that stores service and scope for convenience methods.
461pub struct ScopeConfigBuilderWrapper<Scope, S, E>
462where
463    Scope: ConsensusScope,
464    S: ConsensusStorage<Scope>,
465    E: ConsensusEventBus<Scope>,
466{
467    service: ConsensusService<Scope, S, E>,
468    scope: Scope,
469    builder: ScopeConfigBuilder,
470}
471
472impl<Scope, S, E> ScopeConfigBuilderWrapper<Scope, S, E>
473where
474    Scope: ConsensusScope,
475    S: ConsensusStorage<Scope>,
476    E: ConsensusEventBus<Scope>,
477{
478    fn new(
479        service: ConsensusService<Scope, S, E>,
480        scope: Scope,
481        builder: ScopeConfigBuilder,
482    ) -> Self {
483        Self {
484            service,
485            scope,
486            builder,
487        }
488    }
489
490    /// Set network type (P2P or Gossipsub)
491    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
492        self.builder = self.builder.with_network_type(network_type);
493        self
494    }
495
496    /// Set consensus threshold (0.0 to 1.0)
497    pub fn with_threshold(mut self, threshold: f64) -> Self {
498        self.builder = self.builder.with_threshold(threshold);
499        self
500    }
501
502    /// Set default timeout for proposals (in seconds)
503    pub fn with_timeout(mut self, timeout: Duration) -> Self {
504        self.builder = self.builder.with_timeout(timeout);
505        self
506    }
507
508    /// Set liveness criteria (how silent peers are counted)
509    pub fn with_liveness_criteria(mut self, liveness_criteria_yes: bool) -> Self {
510        self.builder = self.builder.with_liveness_criteria(liveness_criteria_yes);
511        self
512    }
513
514    /// Override max rounds (if None, uses network_type defaults)
515    pub fn with_max_rounds(mut self, max_rounds: Option<u32>) -> Self {
516        self.builder = self.builder.with_max_rounds(max_rounds);
517        self
518    }
519
520    /// Use P2P preset with common defaults
521    pub fn p2p_preset(mut self) -> Self {
522        self.builder = self.builder.p2p_preset();
523        self
524    }
525
526    /// Use Gossipsub preset with common defaults
527    pub fn gossipsub_preset(mut self) -> Self {
528        self.builder = self.builder.gossipsub_preset();
529        self
530    }
531
532    /// Use strict consensus (higher threshold = 0.9)
533    pub fn strict_consensus(mut self) -> Self {
534        self.builder = self.builder.strict_consensus();
535        self
536    }
537
538    /// Use fast consensus (lower threshold = 0.6, shorter timeout = 30s)
539    pub fn fast_consensus(mut self) -> Self {
540        self.builder = self.builder.fast_consensus();
541        self
542    }
543
544    /// Start with network-specific defaults
545    pub fn with_network_defaults(mut self, network_type: NetworkType) -> Self {
546        self.builder = self.builder.with_network_defaults(network_type);
547        self
548    }
549
550    /// Initialize scope with the built configuration
551    pub async fn initialize(self) -> Result<(), ConsensusError> {
552        let config = self.builder.build()?;
553        self.service.initialize_scope(&self.scope, config).await
554    }
555
556    /// Update existing scope configuration with the built configuration
557    pub async fn update(self) -> Result<(), ConsensusError> {
558        let config = self.builder.build()?;
559        self.service
560            .update_scope_config(&self.scope, |existing| {
561                *existing = config;
562                Ok(())
563            })
564            .await
565    }
566
567    /// Get the current configuration (useful for testing)
568    pub fn get_config(&self) -> ScopeConfig {
569        self.builder.get_config()
570    }
571}