Skip to main content

miden_node_store/state/
subscription.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use miden_protocol::block::BlockNumber;
5use thiserror::Error;
6use tokio::sync::{mpsc, watch};
7use tokio_stream::Stream;
8use tokio_stream::wrappers::ReceiverStream;
9
10use super::{BlockCache, ProofCache, State};
11use crate::errors::DatabaseError;
12
13// SUBSCRIPTION EVENTS
14// ================================================================================================
15
16#[derive(Debug)]
17pub struct BlockSubscriptionEvent {
18    pub block: Vec<u8>,
19    pub committed_chain_tip: BlockNumber,
20}
21
22#[derive(Debug)]
23pub struct ProofSubscriptionEvent {
24    pub block_num: BlockNumber,
25    pub proof: Vec<u8>,
26    pub proven_chain_tip: BlockNumber,
27}
28
29#[derive(Debug, Error)]
30pub enum StateSubscriptionError {
31    #[error("failed to load block {block_num}")]
32    BlockLoad {
33        block_num: BlockNumber,
34        #[source]
35        source: DatabaseError,
36    },
37    #[error("block {0} not found")]
38    BlockNotFound(BlockNumber),
39    #[error("failed to load proof for block {block_num}")]
40    ProofLoad {
41        block_num: BlockNumber,
42        #[source]
43        source: DatabaseError,
44    },
45    #[error("proof for block {0} not found")]
46    ProofNotFound(BlockNumber),
47}
48
49pub type BlockSubscriptionStream =
50    Pin<Box<dyn Stream<Item = Result<BlockSubscriptionEvent, StateSubscriptionError>> + Send>>;
51
52pub type ProofSubscriptionStream =
53    Pin<Box<dyn Stream<Item = Result<ProofSubscriptionEvent, StateSubscriptionError>> + Send>>;
54
55impl State {
56    /// Streams committed blocks starting from `from`, replaying historical blocks first and then
57    /// following live commits.
58    pub fn block_subscription(self: &Arc<Self>, from: BlockNumber) -> BlockSubscriptionStream {
59        Box::pin(build_block_stream(
60            from,
61            self.block_cache.clone(),
62            self.subscribe_committed_tip(),
63            Arc::clone(self),
64        ))
65    }
66
67    /// Streams block proofs starting from `from`, replaying historical proofs first and then
68    /// following newly proven blocks.
69    pub fn proof_subscription(self: &Arc<Self>, from: BlockNumber) -> ProofSubscriptionStream {
70        Box::pin(build_proof_stream(
71            from,
72            self.proof_cache.clone(),
73            self.subscribe_proven_tip(),
74            Arc::clone(self),
75        ))
76    }
77}
78
79// STREAM BUILDERS
80// ================================================================================================
81
82fn build_block_stream(
83    from: BlockNumber,
84    cache: BlockCache,
85    tip_rx: watch::Receiver<BlockNumber>,
86    state: Arc<State>,
87) -> impl Stream<Item = Result<BlockSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
88    let (tx, rx) = mpsc::channel(32);
89    tokio::spawn(async move {
90        if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await {
91            let _ = tx.send(Err(err)).await;
92        }
93    });
94    ReceiverStream::new(rx)
95}
96
97fn build_proof_stream(
98    from: BlockNumber,
99    cache: ProofCache,
100    tip_rx: watch::Receiver<BlockNumber>,
101    state: Arc<State>,
102) -> impl Stream<Item = Result<ProofSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
103    let (tx, rx) = mpsc::channel(32);
104    tokio::spawn(async move {
105        if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await {
106            let _ = tx.send(Err(err)).await;
107        }
108    });
109    ReceiverStream::new(rx)
110}
111
112// STREAM TASKS
113// ================================================================================================
114
115async fn run_block_stream(
116    from: BlockNumber,
117    cache: BlockCache,
118    mut tip_rx: watch::Receiver<BlockNumber>,
119    state: Arc<State>,
120    tx: &mpsc::Sender<Result<BlockSubscriptionEvent, StateSubscriptionError>>,
121) -> Result<(), StateSubscriptionError> {
122    let mut next = from;
123    loop {
124        let mut tip = *tip_rx.borrow_and_update();
125        while next <= tip {
126            let block = fetch_block(next, &cache, &state).await?;
127            tip = *tip_rx.borrow_and_update();
128            if tx
129                .send(Ok(BlockSubscriptionEvent { block, committed_chain_tip: tip }))
130                .await
131                .is_err()
132            {
133                return Ok(());
134            }
135            next = next.child();
136        }
137        if tip_rx.changed().await.is_err() {
138            return Ok(());
139        }
140    }
141}
142
143async fn run_proof_stream(
144    from: BlockNumber,
145    cache: ProofCache,
146    mut tip_rx: watch::Receiver<BlockNumber>,
147    state: Arc<State>,
148    tx: &mpsc::Sender<Result<ProofSubscriptionEvent, StateSubscriptionError>>,
149) -> Result<(), StateSubscriptionError> {
150    let mut next = from;
151    loop {
152        let mut tip = *tip_rx.borrow_and_update();
153        while next <= tip {
154            let proof = fetch_proof(next, &cache, &state).await?;
155            tip = *tip_rx.borrow_and_update();
156            if tx
157                .send(Ok(ProofSubscriptionEvent {
158                    block_num: next,
159                    proof,
160                    proven_chain_tip: tip,
161                }))
162                .await
163                .is_err()
164            {
165                return Ok(());
166            }
167            next = next.child();
168        }
169        if tip_rx.changed().await.is_err() {
170            return Ok(());
171        }
172    }
173}
174
175async fn fetch_block(
176    block_num: BlockNumber,
177    cache: &BlockCache,
178    state: &State,
179) -> Result<Vec<u8>, StateSubscriptionError> {
180    if let Some(entry) = cache.get(block_num) {
181        return Ok(entry.block_bytes().to_vec());
182    }
183    state
184        .load_block(block_num)
185        .await
186        .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })?
187        .ok_or(StateSubscriptionError::BlockNotFound(block_num))
188}
189
190async fn fetch_proof(
191    block_num: BlockNumber,
192    cache: &ProofCache,
193    state: &State,
194) -> Result<Vec<u8>, StateSubscriptionError> {
195    if let Some(entry) = cache.get(block_num) {
196        return Ok(entry.proof_bytes().to_vec());
197    }
198    state
199        .load_proof(block_num)
200        .await
201        .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })?
202        .ok_or(StateSubscriptionError::ProofNotFound(block_num))
203}