miden_node_store/state/
subscription.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use miden_protocol::block::BlockNumber;
5use thiserror::Error;
6use tokio::sync::{mpsc, watch};
7use tokio_stream::Stream;
8use tokio_stream::wrappers::ReceiverStream;
9
10use super::{BlockCache, ProofCache, State};
11use crate::errors::DatabaseError;
12
13#[derive(Debug)]
17pub struct BlockSubscriptionEvent {
18 pub block: Vec<u8>,
19 pub committed_chain_tip: BlockNumber,
20}
21
22#[derive(Debug)]
23pub struct ProofSubscriptionEvent {
24 pub block_num: BlockNumber,
25 pub proof: Vec<u8>,
26 pub proven_chain_tip: BlockNumber,
27}
28
29#[derive(Debug, Error)]
30pub enum StateSubscriptionError {
31 #[error("failed to load block {block_num}")]
32 BlockLoad {
33 block_num: BlockNumber,
34 #[source]
35 source: DatabaseError,
36 },
37 #[error("block {0} not found")]
38 BlockNotFound(BlockNumber),
39 #[error("failed to load proof for block {block_num}")]
40 ProofLoad {
41 block_num: BlockNumber,
42 #[source]
43 source: DatabaseError,
44 },
45 #[error("proof for block {0} not found")]
46 ProofNotFound(BlockNumber),
47}
48
49pub type BlockSubscriptionStream =
50 Pin<Box<dyn Stream<Item = Result<BlockSubscriptionEvent, StateSubscriptionError>> + Send>>;
51
52pub type ProofSubscriptionStream =
53 Pin<Box<dyn Stream<Item = Result<ProofSubscriptionEvent, StateSubscriptionError>> + Send>>;
54
55impl State {
56 pub fn block_subscription(self: &Arc<Self>, from: BlockNumber) -> BlockSubscriptionStream {
59 Box::pin(build_block_stream(
60 from,
61 self.block_cache.clone(),
62 self.subscribe_committed_tip(),
63 Arc::clone(self),
64 ))
65 }
66
67 pub fn proof_subscription(self: &Arc<Self>, from: BlockNumber) -> ProofSubscriptionStream {
70 Box::pin(build_proof_stream(
71 from,
72 self.proof_cache.clone(),
73 self.subscribe_proven_tip(),
74 Arc::clone(self),
75 ))
76 }
77}
78
79fn build_block_stream(
83 from: BlockNumber,
84 cache: BlockCache,
85 tip_rx: watch::Receiver<BlockNumber>,
86 state: Arc<State>,
87) -> impl Stream<Item = Result<BlockSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
88 let (tx, rx) = mpsc::channel(32);
89 tokio::spawn(async move {
90 if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await {
91 let _ = tx.send(Err(err)).await;
92 }
93 });
94 ReceiverStream::new(rx)
95}
96
97fn build_proof_stream(
98 from: BlockNumber,
99 cache: ProofCache,
100 tip_rx: watch::Receiver<BlockNumber>,
101 state: Arc<State>,
102) -> impl Stream<Item = Result<ProofSubscriptionEvent, StateSubscriptionError>> + Send + 'static {
103 let (tx, rx) = mpsc::channel(32);
104 tokio::spawn(async move {
105 if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await {
106 let _ = tx.send(Err(err)).await;
107 }
108 });
109 ReceiverStream::new(rx)
110}
111
112async fn run_block_stream(
116 from: BlockNumber,
117 cache: BlockCache,
118 mut tip_rx: watch::Receiver<BlockNumber>,
119 state: Arc<State>,
120 tx: &mpsc::Sender<Result<BlockSubscriptionEvent, StateSubscriptionError>>,
121) -> Result<(), StateSubscriptionError> {
122 let mut next = from;
123 loop {
124 let mut tip = *tip_rx.borrow_and_update();
125 while next <= tip {
126 let block = fetch_block(next, &cache, &state).await?;
127 tip = *tip_rx.borrow_and_update();
128 if tx
129 .send(Ok(BlockSubscriptionEvent { block, committed_chain_tip: tip }))
130 .await
131 .is_err()
132 {
133 return Ok(());
134 }
135 next = next.child();
136 }
137 if tip_rx.changed().await.is_err() {
138 return Ok(());
139 }
140 }
141}
142
143async fn run_proof_stream(
144 from: BlockNumber,
145 cache: ProofCache,
146 mut tip_rx: watch::Receiver<BlockNumber>,
147 state: Arc<State>,
148 tx: &mpsc::Sender<Result<ProofSubscriptionEvent, StateSubscriptionError>>,
149) -> Result<(), StateSubscriptionError> {
150 let mut next = from;
151 loop {
152 let mut tip = *tip_rx.borrow_and_update();
153 while next <= tip {
154 let proof = fetch_proof(next, &cache, &state).await?;
155 tip = *tip_rx.borrow_and_update();
156 if tx
157 .send(Ok(ProofSubscriptionEvent {
158 block_num: next,
159 proof,
160 proven_chain_tip: tip,
161 }))
162 .await
163 .is_err()
164 {
165 return Ok(());
166 }
167 next = next.child();
168 }
169 if tip_rx.changed().await.is_err() {
170 return Ok(());
171 }
172 }
173}
174
175async fn fetch_block(
176 block_num: BlockNumber,
177 cache: &BlockCache,
178 state: &State,
179) -> Result<Vec<u8>, StateSubscriptionError> {
180 if let Some(entry) = cache.get(block_num) {
181 return Ok(entry.block_bytes().to_vec());
182 }
183 state
184 .load_block(block_num)
185 .await
186 .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })?
187 .ok_or(StateSubscriptionError::BlockNotFound(block_num))
188}
189
190async fn fetch_proof(
191 block_num: BlockNumber,
192 cache: &ProofCache,
193 state: &State,
194) -> Result<Vec<u8>, StateSubscriptionError> {
195 if let Some(entry) = cache.get(block_num) {
196 return Ok(entry.proof_bytes().to_vec());
197 }
198 state
199 .load_proof(block_num)
200 .await
201 .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })?
202 .ok_or(StateSubscriptionError::ProofNotFound(block_num))
203}