1use anyhow::bail;
2use essential_lock::StdLock;
3use essential_state_read_vm::StateRead;
4use essential_storage::{
5 failed_solution::{CheckOutcome, FailedSolution, SolutionFailReason, SolutionOutcomes},
6 key_range, CommitData, QueryState, StateStorage, Storage,
7};
8use essential_types::{
9 contract::{Contract, SignedContract},
10 predicate::Predicate,
11 solution::Solution,
12 ContentAddress, Hash, Key, PredicateAddress, Signature, Word,
13};
14use futures::{future::FutureExt, StreamExt};
15use std::{
16 collections::{BTreeMap, HashMap, HashSet},
17 pin::Pin,
18 sync::Arc,
19 time::{Duration, SystemTime, UNIX_EPOCH},
20};
21use thiserror::Error;
22
23mod values;
24
25const PAGE_SIZE: usize = 100;
27
28#[derive(Clone)]
29pub struct MemoryStorage {
30 inner: Arc<StdLock<Inner>>,
31 streams: essential_storage::streams::Notify,
32}
33
34impl Default for MemoryStorage {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40#[derive(Default, Debug)]
41struct Inner {
42 contracts: HashMap<ContentAddress, ContractWithAddresses>,
43 predicates: HashMap<ContentAddress, Predicate>,
44 contract_time_index: BTreeMap<Duration, Vec<ContentAddress>>,
45 solution_pool: HashSet<Hash>,
46 solution_time_index: BTreeMap<Duration, Vec<Hash>>,
47 failed_solution_pool: HashMap<Hash, Vec<(SolutionFailReason, Duration)>>,
48 failed_solution_time_index: BTreeMap<Duration, Vec<Hash>>,
49 solutions: HashMap<Hash, Solution>,
50 solved: BTreeMap<Duration, Block>,
52 block_number_index: HashMap<u64, Duration>,
53 solution_block_time_index: HashMap<Hash, Vec<Duration>>,
54 state: HashMap<ContentAddress, BTreeMap<Key, Vec<Word>>>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
58struct Block {
59 number: u64,
60 timestamp: Duration,
61 hashes: Vec<Hash>,
62}
63
64#[derive(Debug)]
65struct ContractWithAddresses {
66 salt: Hash,
67 data: HashSet<ContentAddress>,
68 signature: Signature,
69}
70
71impl ContractWithAddresses {
72 fn predicate_addrs(&self) -> Vec<&ContentAddress> {
74 let mut addrs: Vec<_> = self.data.iter().collect();
75 addrs.sort();
76 addrs
77 }
78
79 fn predicates_owned(&self, predicates: &HashMap<ContentAddress, Predicate>) -> Vec<Predicate> {
81 self.predicate_addrs()
82 .into_iter()
83 .filter_map(|addr| predicates.get(addr).cloned())
84 .collect()
85 }
86
87 fn signed_contract(&self, predicates: &HashMap<ContentAddress, Predicate>) -> SignedContract {
91 let signature = self.signature.clone();
92 let predicates = self.predicates_owned(predicates);
93 SignedContract {
94 contract: Contract {
95 salt: self.salt,
96 predicates,
97 },
98 signature,
99 }
100 }
101}
102
103impl MemoryStorage {
104 pub fn new() -> Self {
105 Self {
106 inner: Arc::new(StdLock::new(Inner::default())),
107 streams: essential_storage::streams::Notify::new(),
108 }
109 }
110}
111
112impl StateStorage for MemoryStorage {
113 async fn update_state(
114 &self,
115 address: &ContentAddress,
116 key: &Key,
117 value: Vec<Word>,
118 ) -> anyhow::Result<Vec<Word>> {
119 self.inner.apply(|i| {
120 let Some(map) = i.state.get_mut(address) else {
121 bail!("No state for address, {:?}", address);
122 };
123 let v = if value.is_empty() {
124 map.remove(key)
125 } else {
126 map.insert(key.clone(), value)
127 };
128 let v = v.unwrap_or_default();
129 Ok(v)
130 })
131 }
132
133 async fn update_state_batch<U>(&self, updates: U) -> anyhow::Result<Vec<Vec<Word>>>
134 where
135 U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)> + Send,
136 {
137 let v = self.inner.apply(|i| update_state_batch(i, updates));
138 Ok(v)
139 }
140}
141
142impl QueryState for MemoryStorage {
143 async fn query_state(&self, address: &ContentAddress, key: &Key) -> anyhow::Result<Vec<Word>> {
144 let v = self.inner.apply(|i| {
145 let map = i.state.get(address)?;
146 let v = map.get(key)?;
147 Some(v.clone())
148 });
149 Ok(v.unwrap_or_default())
150 }
151}
152
153impl Storage for MemoryStorage {
154 async fn insert_contract(&self, signed: SignedContract) -> anyhow::Result<()> {
155 let SignedContract {
156 contract,
157 signature,
158 } = signed;
159
160 let salt = contract.salt;
161
162 let data: HashMap<_, _> = contract
163 .predicates
164 .into_iter()
165 .map(|p| (essential_hash::content_addr(&p), p))
166 .collect();
167
168 let contract_addr =
169 essential_hash::contract_addr::from_predicate_addrs(data.keys().cloned(), &salt);
170
171 let contract_with_addrs = ContractWithAddresses {
172 salt,
173 data: data.keys().cloned().collect(),
174 signature,
175 };
176 let time = SystemTime::now().duration_since(UNIX_EPOCH)?;
177 let r = self.inner.apply(|i| {
178 i.predicates.extend(data);
179 let contains = i
180 .contracts
181 .insert(contract_addr.clone(), contract_with_addrs);
182 if contains.is_none() {
183 i.contract_time_index
184 .entry(time)
185 .or_default()
186 .push(contract_addr.clone());
187 }
188 i.state.entry(contract_addr).or_default();
189 Ok(())
190 });
191
192 self.streams.notify_new_contracts();
194 r
195 }
196
197 async fn insert_solution_into_pool(&self, solution: Solution) -> anyhow::Result<()> {
198 let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
199 let hash = essential_hash::hash(&solution);
200 self.inner.apply(|i| {
201 if i.solution_pool.insert(hash) {
202 i.solution_time_index
203 .entry(timestamp)
204 .or_default()
205 .push(hash);
206 }
207 i.solutions.insert(hash, solution);
208 });
209 Ok(())
210 }
211
212 async fn move_solutions_to_solved(
213 &self,
214 block_number: u64,
215 block_timestamp: Duration,
216 solutions: &[Hash],
217 ) -> anyhow::Result<()> {
218 let new_block = !solutions.is_empty();
219 let hashes: HashSet<_> = solutions.iter().collect();
220 let r = self.inner.apply(|i| {
221 move_solutions_to_solved(i, block_number, block_timestamp, solutions, hashes)
222 });
223
224 if new_block {
225 self.streams.notify_new_blocks();
227 }
228 r
229 }
230
231 async fn move_solutions_to_failed(
232 &self,
233 solutions: &[(Hash, SolutionFailReason)],
234 ) -> anyhow::Result<()> {
235 let hashes: HashSet<_> = solutions.iter().map(|(h, _)| h).collect();
236 self.inner
237 .apply(|i| move_solutions_to_failed(i, solutions, hashes))
238 }
239
240 async fn get_predicate(&self, address: &PredicateAddress) -> anyhow::Result<Option<Predicate>> {
241 let v = self.inner.apply(|i| {
242 if i.contracts
243 .get(&address.contract)
244 .map_or(true, |c| !c.data.contains(&address.predicate))
245 {
246 return None;
247 }
248 let predicate = i.predicates.get(&address.predicate)?;
249 Some(predicate.clone())
250 });
251 Ok(v)
252 }
253
254 async fn get_contract(
255 &self,
256 address: &ContentAddress,
257 ) -> anyhow::Result<Option<SignedContract>> {
258 let v = self
259 .inner
260 .apply(|i| Some(i.contracts.get(address)?.signed_contract(&i.predicates)));
261 Ok(v)
262 }
263
264 async fn list_contracts(
265 &self,
266 time_range: Option<std::ops::Range<std::time::Duration>>,
267 page: Option<usize>,
268 ) -> anyhow::Result<Vec<Contract>> {
269 let page = page.unwrap_or(0);
270 match time_range {
271 Some(range) => {
272 let v = self.inner.apply(|i| {
273 values::page_contract_by_time(
274 &i.contract_time_index,
275 &i.contracts,
276 &i.predicates,
277 range,
278 page,
279 PAGE_SIZE,
280 )
281 });
282 Ok(v)
283 }
284 None => {
285 let v = self.inner.apply(|i| {
286 values::page_contract(
287 i.contract_time_index.values().flatten(),
288 &i.contracts,
289 &i.predicates,
290 page,
291 PAGE_SIZE,
292 )
293 });
294 Ok(v)
295 }
296 }
297 }
298
299 fn subscribe_contracts(
300 self,
301 start_time: Option<Duration>,
302 start_page: Option<usize>,
303 ) -> impl futures::Stream<Item = anyhow::Result<Contract>> + Send + 'static {
304 let new_contracts = self.streams.subscribe_contracts();
305 let init = essential_storage::streams::StreamState::new(start_page, start_time, None);
306 futures::stream::unfold(init, move |state| {
307 let storage = self.clone();
308 essential_storage::streams::next_data(
309 new_contracts.clone(),
310 state,
311 PAGE_SIZE,
312 move |get| {
315 let storage = storage.clone();
316 async move {
317 storage
318 .list_contracts(get.time.map(|s| s..Duration::MAX), Some(get.page))
319 .await
320 }
321 },
322 )
323 })
324 .flat_map(futures::stream::iter)
325 }
326
327 async fn list_solutions_pool(&self, page: Option<usize>) -> anyhow::Result<Vec<Solution>> {
328 Ok(self.inner.apply(|i| {
329 let iter = i
330 .solution_time_index
331 .values()
332 .flatten()
333 .filter(|h| i.solution_pool.contains(*h));
334 values::page_solutions(
335 iter,
336 |h| i.solutions.get(h).cloned(),
337 page.unwrap_or(0),
338 PAGE_SIZE,
339 )
340 }))
341 }
342
343 async fn list_failed_solutions_pool(
344 &self,
345 page: Option<usize>,
346 ) -> anyhow::Result<Vec<FailedSolution>> {
347 Ok(self.inner.apply(|i| {
348 let iter = i.failed_solution_time_index.values().flat_map(|hashes| {
349 hashes.iter().flat_map(|h| {
350 i.failed_solution_pool
351 .get(h)
352 .into_iter()
353 .flatten()
354 .map(|r| (*h, r.clone()))
355 })
356 });
357 values::page_solutions(
358 iter,
359 |(h, r)| {
360 let solution = i.solutions.get(&h).cloned()?;
361 Some(FailedSolution {
362 solution,
363 reason: r.0,
364 })
365 },
366 page.unwrap_or(0),
367 PAGE_SIZE,
368 )
369 }))
370 }
371
372 async fn list_blocks(
373 &self,
374 time_range: Option<std::ops::Range<std::time::Duration>>,
375 block_number: Option<u64>,
376 page: Option<usize>,
377 ) -> anyhow::Result<Vec<essential_types::Block>> {
378 let page = page.unwrap_or(0);
379 self.inner.apply(|i| {
380 values::page_blocks(
381 &i.solved,
382 &i.solutions,
383 &i.block_number_index,
384 time_range,
385 block_number,
386 page,
387 PAGE_SIZE,
388 )
389 })
390 }
391
392 fn subscribe_blocks(
393 self,
394 start_time: Option<Duration>,
395 block_number: Option<u64>,
396 start_page: Option<usize>,
397 ) -> impl futures::Stream<Item = anyhow::Result<essential_types::Block>> + Send + 'static {
398 let new_blocks = self.streams.subscribe_blocks();
399 let init =
400 essential_storage::streams::StreamState::new(start_page, start_time, block_number);
401 futures::stream::unfold(init, move |state| {
402 let storage = self.clone();
403 essential_storage::streams::next_data(
404 new_blocks.clone(),
405 state,
406 PAGE_SIZE,
407 move |get| {
410 let storage = storage.clone();
411 async move {
412 storage
413 .list_blocks(
414 get.time.map(|s| s..Duration::MAX),
415 get.number,
416 Some(get.page),
417 )
418 .await
419 }
420 },
421 )
422 })
423 .flat_map(futures::stream::iter)
424 }
425
426 async fn get_solution(&self, solution_hash: Hash) -> anyhow::Result<Option<SolutionOutcomes>> {
427 let r = self.inner.apply(|i| {
428 i.solutions.get(&solution_hash).cloned().map(|s| {
429 let mut outcomes: Vec<_> = i
430 .failed_solution_pool
431 .get(&solution_hash)
432 .into_iter()
433 .flatten()
434 .cloned()
435 .map(|(r, t)| (t, CheckOutcome::Fail(r)))
436 .chain(
437 i.solution_block_time_index
438 .get(&solution_hash)
439 .into_iter()
440 .flatten()
441 .filter_map(|time| {
442 let b = i.solved.get(time)?;
443 Some((*time, CheckOutcome::Success(b.number)))
444 }),
445 )
446 .collect();
447 outcomes.sort_by_key(|(t, _)| *t);
448 let outcome = outcomes.into_iter().map(|(_, o)| o).collect();
449 SolutionOutcomes {
450 solution: s.clone(),
451 outcome,
452 }
453 })
454 });
455 Ok(r)
456 }
457
458 async fn prune_failed_solutions(&self, older_than: Duration) -> anyhow::Result<()> {
459 self.inner.apply(|i| {
460 i.failed_solution_time_index.retain(|timestamp, hash| {
461 let retain = *timestamp >= older_than;
462 if !retain {
463 for hash in hash {
464 i.failed_solution_pool.remove(hash);
465 }
466 }
467 retain
468 });
469 Ok(())
470 })
471 }
472
473 fn commit_block(
474 &self,
475 data: CommitData,
476 ) -> impl std::future::Future<Output = anyhow::Result<()>> + Send {
477 let CommitData {
478 failed,
479 solved,
480 state_updates,
481 block_number,
482 block_timestamp,
483 } = data;
484 let hashes: HashSet<_> = failed.iter().map(|(h, _)| h).collect();
485 let solved_hashes: HashSet<_> = solved.iter().collect();
486 let r = self.inner.apply(|i| {
487 let new_block = !solved_hashes.is_empty();
488 move_solutions_to_failed(i, failed, hashes)?;
489 move_solutions_to_solved(i, block_number, block_timestamp, solved, solved_hashes)?;
490 update_state_batch(i, state_updates);
491 Ok(new_block)
492 });
493
494 if let Ok(r) = &r {
495 if *r {
496 self.streams.notify_new_blocks();
498 }
499 }
500
501 async { r.map(|_| ()) }
502 }
503
504 async fn get_latest_block(&self) -> anyhow::Result<Option<essential_types::Block>> {
505 let r = self.inner.apply(|i| match i.solved.last_key_value() {
506 Some((_, block)) => {
507 let solutions = block
508 .hashes
509 .iter()
510 .map(|h| i.solutions.get(h).cloned())
511 .collect::<Option<Vec<_>>>()?;
512 Some(essential_types::Block {
513 number: block.number,
514 timestamp: block.timestamp,
515 solutions,
516 })
517 }
518 None => None,
519 });
520 Ok(r)
521 }
522}
523
524fn move_solutions_to_failed(
525 i: &mut Inner,
526 solutions: &[(Hash, SolutionFailReason)],
527 hashes: HashSet<&Hash>,
528) -> Result<(), anyhow::Error> {
529 let time = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?;
530 let solutions = solutions.iter().filter_map(|(h, r)| {
531 if i.solution_pool.remove(h) {
532 Some((*h, r.clone()))
533 } else {
534 None
535 }
536 });
537
538 for v in i.solution_time_index.values_mut() {
539 v.retain(|h| !hashes.contains(h));
540 }
541 i.solution_time_index.retain(|_, v| !v.is_empty());
542
543 for (hash, reason) in solutions {
544 i.failed_solution_pool
545 .entry(hash)
546 .or_default()
547 .push((reason, time));
548 i.failed_solution_time_index
549 .entry(time)
550 .or_default()
551 .push(hash);
552 }
553
554 Ok(())
555}
556
557fn move_solutions_to_solved(
558 i: &mut Inner,
559 block_number: u64,
560 block_timestamp: Duration,
561 solutions: &[Hash],
562 hashes: HashSet<&Hash>,
563) -> Result<(), anyhow::Error> {
564 if solutions.is_empty() {
565 return Ok(());
566 }
567
568 if solutions.iter().all(|s| !i.solution_pool.contains(s)) {
569 return Ok(());
570 }
571
572 if i.solved.contains_key(&block_timestamp) {
573 bail!("Two blocks created at the same time");
574 }
575
576 for v in i.solution_time_index.values_mut() {
577 v.retain(|h| !hashes.contains(h));
578 }
579 i.solution_time_index.retain(|_, v| !v.is_empty());
580
581 for hash in solutions {
582 i.solution_block_time_index
583 .entry(*hash)
584 .or_default()
585 .push(block_timestamp);
586 }
587 let solutions = solutions
588 .iter()
589 .filter(|h| i.solution_pool.remove(*h))
590 .cloned()
591 .collect();
592
593 let block = Block {
594 number: block_number,
595 timestamp: block_timestamp,
596 hashes: solutions,
597 };
598 i.solved.insert(block_timestamp, block);
599 i.block_number_index.insert(block_number, block_timestamp);
600 Ok(())
601}
602
603fn update_state_batch<U>(i: &mut Inner, updates: U) -> Vec<Vec<i64>>
604where
605 U: IntoIterator<Item = (ContentAddress, Key, Vec<Word>)>,
606{
607 updates
608 .into_iter()
609 .map(|(address, key, value)| {
610 let map = i.state.entry(address).or_default();
611 let v = if value.is_empty() {
612 map.remove(&key)
613 } else {
614 map.insert(key, value)
615 };
616 v.unwrap_or_default()
617 })
618 .collect()
619}
620
621#[derive(Debug, Error)]
622pub enum MemoryStorageError {
623 #[error("failed to read from memory storage")]
624 ReadError(#[from] anyhow::Error),
625 #[error("invalid key range")]
626 KeyRangeError,
627}
628
629impl StateRead for MemoryStorage {
630 type Error = MemoryStorageError;
631
632 type Future =
633 Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<Word>>, Self::Error>> + Send>>;
634
635 fn key_range(&self, contract_addr: ContentAddress, key: Key, num_words: usize) -> Self::Future {
636 let storage = self.clone();
637 async move { key_range(&storage, contract_addr, key, num_words).await }.boxed()
638 }
639}