1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#![allow(clippy::collapsible_if)]
use itertools::Itertools;
use crate::{errors::ClientResult, BlockHeight, ClientError, HasBlockInfo, MoreInfo};
use super::{Client, Update};
// NOTE: one block should keeped always to get meta and etc.
// that's correct in case if you use `fetch_updates_from_reorgs`
pub const REORG_CACHE_SIZE: usize = 31;
impl<T: HasBlockInfo> Client<T> {
/// for docs check `fetch_updates_from_cache_starting_from`
/// this is same function but with start_height = 0
pub async fn fetch_updates_from_cache(&self) -> ClientResult<Vec<Update<T>>> {
self.fetch_updates_from_cache_starting_from(0).await
}
/// Almost all docs from `fetch_updates` aplied to this, but there is some differences:
/// 1: this function uses cache (which should be processed or error)
/// 2: this function not returns `Update::RemoveBlock` instead it returns `Update::RemoveCachedBlock` which contains not only height but also block itself which allows to perform more reorg stuff (and allows you to not store block by yourself if you need it)
/// 3: here you should set `reorg_cache` or error
///
/// So, now to the reason why you should marke blocks as processed.
/// It's simple: when you restart program or thread dies and cache keeped how do you know which cache are
/// should be processed and which not.
///
/// Also about start height. Client begin fetching from this point if there is no cached blocks present.
pub async fn fetch_updates_from_cache_starting_from(
&self,
start_height: BlockHeight,
) -> ClientResult<Vec<Update<T>>> {
if self.reorg_cache.is_none() {
return Err(ClientError::CacheNotInited);
}
let cache = self.reorg_cache.as_ref().unwrap();
let cache_len = cache.len().await;
if cache_len > REORG_CACHE_SIZE {
return Err(ClientError::CacheNotProcessed(cache_len));
}
let mut metas = Vec::new();
for i in cache.items().await {
metas.push(cache.read_cache(i).await?.block);
}
// to be able to start from height
if metas.is_empty() {
metas = vec![
self.get_electrs_block_meta(start_height.saturating_sub(1))
.await?,
];
};
// idea here is that remove is completely new batch which prevents loop from
// remove next add update or in oter word yet anoter time sucking because of async
let upds_batch = self
.fetch_updates(&metas)
.await
.err_log()?
.into_iter()
.fold(Vec::<Vec<Update<T>>>::new(), |mut acc, v| {
match v {
Update::AddBlock { .. } => {
if acc
.last()
.and_then(|v| v.first())
.is_some_and(|v| v.is_remove())
{
acc.push(vec![v]);
} else if let Some(acc) = acc.last_mut() {
acc.push(v);
} else {
acc.push(vec![v]);
}
}
Update::RemoveBlock { .. } => acc.push(vec![v]),
}
acc
});
let mut res = vec![];
for update in upds_batch.into_iter().flatten() {
match update {
Update::AddBlock(update) => {
cache.add(update.block.height, &update).await.err_log()?;
let items = cache.items().await;
if items.len() > REORG_CACHE_SIZE {
if cache.read_last_processed().await.err_log()?
>= *items.first().unwrap_or(&0)
{
cache.remove_oldest().await?
}
}
res.push(Update::AddBlock(update));
}
Update::RemoveBlock { height, .. } => {
let block = cache.read_cache(height).await.err_log()?;
cache.remove(height).await.err_log()?;
res.push(Update::RemoveBlock {
height,
block: Some(block),
});
}
}
}
Ok(res)
}
/// marks cache block as processed and removes cache file if it exceeds reorg buffer
/// for the reason why it shold bother you go to `fetch_updates_from_cache_starting_from` docs
pub async fn mark_as_processed(&self, height: BlockHeight) -> ClientResult<()> {
if let Some(ref cache) = self.reorg_cache {
if cache.len().await > REORG_CACHE_SIZE {
cache.remove(height).await.err_log()?;
}
cache.write_last_processed(height).await.err_log()?;
};
Ok(())
}
/// returns vec of cached blocks which not processed as updates to handle
pub async fn get_unprocessed_blocks_as_updates(&self) -> ClientResult<Vec<Update<T>>> {
if let Some(ref cache) = self.reorg_cache {
let mut res = Vec::new();
for i in self.get_unprocessed().await? {
res.push(Update::AddBlock(cache.read_cache(i).await?));
}
Ok(res)
} else {
Ok(vec![])
}
}
/// returns vec of cached blocks which not processed
pub async fn get_unprocessed(&self) -> ClientResult<Vec<BlockHeight>> {
if let Some(ref cache) = self.reorg_cache {
let lp = cache.read_last_processed().await.err_log()?;
return Ok(cache
.items()
.await
.into_iter()
.filter(|v| *v > lp)
.collect_vec());
};
Ok(vec![])
}
/// returns vec of cached blocks
pub async fn get_cached(&self) -> ClientResult<Vec<BlockHeight>> {
if let Some(ref cache) = self.reorg_cache {
return Ok(cache.items().await);
};
Ok(vec![])
}
pub async fn clean_cache(&self) -> ClientResult<()> {
if let Some(ref cache) = self.reorg_cache {
cache.clean_all()?;
}
Ok(())
}
}