Skip to main content

arc_malachitebft_sync/
effect.rs

1use std::{marker::PhantomData, ops::RangeInclusive};
2
3use derive_where::derive_where;
4use thiserror::Error;
5
6use malachitebft_core_types::Context;
7use malachitebft_peer::PeerId;
8
9use crate::{InboundRequestId, OutboundRequestId, ValueRequest, ValueResponse};
10
11/// Provides a way to construct the appropriate [`Resume`] value to
12/// resume execution after handling an [`Effect`].
13///
14/// Each `Effect` embeds a value that implements [`Resumable`]
15/// which is used to construct the appropriate [`Resume`] value.
16///
17/// ## Example
18///
19/// ```rust,ignore
20/// fn effect_handler(effect: Effect<Ctx>) -> Result<Resume<Ctx>, Error> {
21///     match effect {
22///         Effect::ResetTimeouts(r) => {
23///             reset_timeouts();
24///             Ok(r.resume_with(()))
25///         }
26///         Effect::GetValidatorSet(height, r) => {
27///             let validator_set = get_validator_set(height);
28///             Ok(r.resume_with(validator_set))
29///         }
30///        // ...
31///     }
32/// }
33/// ```
34pub trait Resumable<Ctx: Context> {
35    /// The value type that will be used to resume execution
36    type Value;
37
38    /// Creates the appropriate [`Resume`] value to resume execution with.
39    fn resume_with(self, value: Self::Value) -> Resume<Ctx>;
40}
41
42#[derive_where(Debug)]
43#[derive(Error)]
44pub enum Error<Ctx: Context> {
45    /// The coroutine was resumed with a value which
46    /// does not match the expected type of resume value.
47    #[error("Unexpected resume: {0:?}, expected one of: {1}")]
48    UnexpectedResume(Resume<Ctx>, &'static str),
49}
50
51#[derive_where(Debug)]
52pub enum Resume<Ctx: Context> {
53    Continue(PhantomData<Ctx>),
54    ValueRequestId(Option<OutboundRequestId>),
55}
56
57impl<Ctx: Context> Default for Resume<Ctx> {
58    fn default() -> Self {
59        Self::Continue(PhantomData)
60    }
61}
62
63#[derive_where(Debug)]
64pub enum Effect<Ctx: Context> {
65    /// Broadcast our status to our direct peers
66    BroadcastStatus(Ctx::Height, resume::Continue),
67
68    /// Send a ValueSync request to a peer
69    SendValueRequest(PeerId, ValueRequest<Ctx>, resume::ValueRequestId),
70
71    /// Send a response to a ValueSync request
72    SendValueResponse(InboundRequestId, ValueResponse<Ctx>, resume::Continue),
73
74    /// Retrieve a range of values from the application
75    GetDecidedValues(
76        InboundRequestId,
77        RangeInclusive<Ctx::Height>,
78        resume::Continue,
79    ),
80
81    /// Tell consensus to process the sync response
82    ProcessValueResponse(
83        PeerId,
84        OutboundRequestId,
85        ValueResponse<Ctx>,
86        resume::Continue,
87    ),
88}
89
90pub mod resume {
91
92    use super::*;
93
94    #[derive(Debug, Default)]
95    pub struct Continue;
96
97    impl<Ctx: Context> Resumable<Ctx> for Continue {
98        type Value = ();
99
100        fn resume_with(self, _: ()) -> Resume<Ctx> {
101            Resume::default()
102        }
103    }
104
105    #[derive(Debug, Default)]
106    pub struct ValueRequestId;
107
108    impl<Ctx: Context> Resumable<Ctx> for ValueRequestId {
109        type Value = Option<OutboundRequestId>;
110
111        fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
112            Resume::ValueRequestId(value)
113        }
114    }
115}