1#![warn(missing_docs)]
2
3pub use error::QueryError;
15use essential_hash::content_addr;
16#[doc(inline)]
17pub use essential_node_db_sql as sql;
18use essential_node_types::{block, Block, BlockHeader};
19use essential_types::{
20 convert::{bytes_from_word, word_from_bytes},
21 solution::{Mutation, Solution, SolutionSet},
22 ContentAddress, Hash, Key, PredicateAddress, Value, Word,
23};
24use futures::Stream;
25#[cfg(feature = "pool")]
26pub use pool::ConnectionPool;
27pub use query_range::address;
28pub use query_range::finalized;
29use rusqlite::{named_params, params, Connection, OptionalExtension, Transaction};
30use std::{ops::Range, time::Duration};
31
32mod error;
33#[cfg(feature = "pool")]
34pub mod pool;
35mod query_range;
36
37pub trait AcquireConnection {
40 #[allow(async_fn_in_trait)]
45 async fn acquire_connection(&self) -> Option<impl 'static + AsMut<Connection>>;
46}
47
48pub trait AwaitNewBlock {
51 #[allow(async_fn_in_trait)]
56 async fn await_new_block(&mut self) -> Option<()>;
57}
58
59pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
61 for table in sql::table::ALL {
62 tx.execute(table.create, ())?;
63 }
64 Ok(())
65}
66
67pub fn insert_block(tx: &Transaction, block: &Block) -> rusqlite::Result<ContentAddress> {
74 let secs = block.header.timestamp.as_secs();
76 let nanos = block.header.timestamp.subsec_nanos() as u64;
77 let solution_set_addrs: Vec<ContentAddress> =
78 block.solution_sets.iter().map(content_addr).collect();
79 let block_address =
80 block::addr::from_header_and_solution_set_addrs_slice(&block.header, &solution_set_addrs);
81
82 let parent_block_address = ContentAddress([0; 32]);
84
85 tx.execute(
86 sql::insert::BLOCK,
87 named_params! {
88 ":block_address": block_address.0,
89 ":parent_block_address": parent_block_address.0,
90 ":number": block.header.number,
91 ":timestamp_secs": secs,
92 ":timestamp_nanos": nanos,
93 },
94 )?;
95
96 let mut stmt_solution_set = tx.prepare(sql::insert::SOLUTION_SET)?;
98 let mut stmt_block_solution_set = tx.prepare(sql::insert::BLOCK_SOLUTION_SET)?;
99 let mut stmt_solution = tx.prepare(sql::insert::SOLUTION)?;
100 let mut stmt_mutation = tx.prepare(sql::insert::MUTATION)?;
101 let mut stmt_pred_data = tx.prepare(sql::insert::PRED_DATA)?;
102
103 for (ix, (solution_set, ca)) in block
104 .solution_sets
105 .iter()
106 .zip(solution_set_addrs)
107 .enumerate()
108 {
109 stmt_solution_set.execute(named_params! {
111 ":content_addr": ca.0,
112 })?;
113
114 stmt_block_solution_set.execute(named_params! {
116 ":block_address": block_address.0,
117 ":solution_set_addr": &ca.0,
118 ":solution_set_index": ix,
119 })?;
120
121 for (solution_ix, solution) in solution_set.solutions.iter().enumerate() {
123 stmt_solution.execute(named_params! {
124 ":solution_set_addr": ca.0,
125 ":solution_index": solution_ix,
126 ":contract_addr": solution.predicate_to_solve.contract.0,
127 ":predicate_addr": solution.predicate_to_solve.predicate.0,
128 })?;
129 for (mutation_ix, mutation) in solution.state_mutations.iter().enumerate() {
130 stmt_mutation.execute(named_params! {
131 ":solution_set_addr": ca.0,
132 ":solution_index": solution_ix,
133 ":mutation_index": mutation_ix,
134 ":key": blob_from_words(&mutation.key),
135 ":value": blob_from_words(&mutation.value),
136 })?;
137 }
138 for (pred_data_ix, pred_data) in solution.predicate_data.iter().enumerate() {
139 stmt_pred_data.execute(named_params! {
140 ":solution_set_addr": ca.0,
141 ":solution_index": solution_ix,
142 ":pred_data_index": pred_data_ix,
143 ":value": blob_from_words(pred_data)
144 })?;
145 }
146 }
147 }
148 stmt_solution_set.finalize()?;
149 stmt_block_solution_set.finalize()?;
150 stmt_solution.finalize()?;
151 stmt_mutation.finalize()?;
152 stmt_pred_data.finalize()?;
153
154 Ok(block_address)
155}
156
157pub fn finalize_block(conn: &Connection, block_address: &ContentAddress) -> rusqlite::Result<()> {
160 conn.execute(
161 sql::insert::FINALIZE_BLOCK,
162 named_params! {
163 ":block_address": block_address.0,
164 },
165 )?;
166 Ok(())
167}
168
169pub fn insert_failed_block(
171 conn: &Connection,
172 block_address: &ContentAddress,
173 solution_set_addr: &ContentAddress,
174) -> rusqlite::Result<()> {
175 conn.execute(
176 sql::insert::FAILED_BLOCK,
177 named_params! {
178 ":block_address": block_address.0,
179 ":solution_set_addr": solution_set_addr.0,
180 },
181 )?;
182 Ok(())
183}
184
185pub fn update_state(
187 conn: &Connection,
188 contract_ca: &ContentAddress,
189 key: &Key,
190 value: &Value,
191) -> rusqlite::Result<()> {
192 conn.execute(
193 sql::update::STATE,
194 named_params! {
195 ":contract_ca": contract_ca.0,
196 ":key": blob_from_words(key),
197 ":value": blob_from_words(value),
198 },
199 )?;
200 Ok(())
201}
202
203pub fn update_validation_progress(
205 conn: &Connection,
206 block_address: &ContentAddress,
207) -> rusqlite::Result<()> {
208 conn.execute(
209 sql::insert::VALIDATION_PROGRESS,
210 named_params! {
211 ":block_address": block_address.0,
212 },
213 )?;
214 Ok(())
215}
216
217pub fn delete_state(
219 conn: &Connection,
220 contract_ca: &ContentAddress,
221 key: &Key,
222) -> rusqlite::Result<()> {
223 conn.execute(
224 sql::update::DELETE_STATE,
225 named_params! {
226 ":contract_ca": contract_ca.0,
227 ":key": blob_from_words(key),
228 },
229 )?;
230 Ok(())
231}
232
233pub fn get_solution_set(tx: &Transaction, ca: &ContentAddress) -> Result<SolutionSet, QueryError> {
235 let mut solution_stmt = tx.prepare(sql::query::GET_SOLUTION)?;
236 let mut solutions = solution_stmt
237 .query_map([ca.0], |row| {
238 let contract_addr = row.get::<_, Hash>("contract_addr")?;
239 let predicate_addr = row.get::<_, Hash>("predicate_addr")?;
240 Ok(Solution {
241 predicate_to_solve: PredicateAddress {
242 contract: ContentAddress(contract_addr),
243 predicate: ContentAddress(predicate_addr),
244 },
245 state_mutations: vec![],
246 predicate_data: vec![],
247 })
248 })?
249 .collect::<Result<Vec<_>, _>>()?;
250 solution_stmt.finalize()?;
251
252 let mut pred_data_stmt = tx.prepare(sql::query::GET_SOLUTION_PRED_DATA)?;
253 let mut mutations_stmt = tx.prepare(sql::query::GET_SOLUTION_MUTATIONS)?;
254
255 for (solution_ix, solution) in solutions.iter_mut().enumerate() {
256 let mut mutation_rows = mutations_stmt.query(named_params! {
258 ":content_addr": ca.0,
259 ":solution_index": solution_ix,
260 })?;
261 while let Some(mutation_row) = mutation_rows.next()? {
262 let key_blob: Vec<u8> = mutation_row.get("key")?;
263 let value_blob: Vec<u8> = mutation_row.get("value")?;
264 let key: Key = words_from_blob(&key_blob);
265 let value: Value = words_from_blob(&value_blob);
266 solution.state_mutations.push(Mutation { key, value });
267 }
268
269 let mut pred_data_rows = pred_data_stmt.query(named_params! {
271 ":content_addr": ca.0,
272 ":solution_index": solution_ix,
273 })?;
274 while let Some(pred_data_row) = pred_data_rows.next()? {
275 let value_blob: Vec<u8> = pred_data_row.get("value")?;
276 let value: Value = words_from_blob(&value_blob);
277 solution.predicate_data.push(value);
278 }
279 }
280
281 mutations_stmt.finalize()?;
282 pred_data_stmt.finalize()?;
283
284 Ok(SolutionSet { solutions })
285}
286
287pub fn query_state(
289 conn: &Connection,
290 contract_ca: &ContentAddress,
291 key: &Key,
292) -> Result<Option<Value>, QueryError> {
293 use rusqlite::OptionalExtension;
294 let mut stmt = conn.prepare(sql::query::GET_STATE)?;
295 let value_blob: Option<Vec<u8>> = stmt
296 .query_row(params![contract_ca.0, blob_from_words(key)], |row| {
297 row.get("value")
298 })
299 .optional()?;
300 Ok(value_blob.as_deref().map(words_from_blob))
301}
302
303pub fn get_block_header(
307 conn: &Connection,
308 block_address: &ContentAddress,
309) -> rusqlite::Result<Option<BlockHeader>> {
310 conn.query_row(
311 sql::query::GET_BLOCK_HEADER,
312 named_params! {
313 ":block_address": block_address.0,
314 },
315 |row| {
316 let number: Word = row.get("number")?;
317 let timestamp_secs: u64 = row.get("timestamp_secs")?;
318 let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
319 let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
320 Ok(BlockHeader { number, timestamp })
321 },
322 )
323 .optional()
324}
325
326pub fn get_block(
328 tx: &Transaction,
329 block_address: &ContentAddress,
330) -> Result<Option<Block>, QueryError> {
331 let Some(header) = get_block_header(tx, block_address)? else {
332 return Ok(None);
333 };
334 let mut stmt = tx.prepare(sql::query::GET_BLOCK)?;
335 let rows = stmt.query_map(
336 named_params! {
337 ":block_address": block_address.0,
338 },
339 |row| {
340 let solution_addr: Hash = row.get("content_addr")?;
341 Ok(ContentAddress(solution_addr))
342 },
343 )?;
344
345 let mut block = Block {
346 header,
347 solution_sets: vec![],
348 };
349 for res in rows {
350 let solution_set_addr: ContentAddress = res?;
351
352 let solution_set = get_solution_set(tx, &solution_set_addr)?;
356 block.solution_sets.push(solution_set);
357 }
358 Ok(Some(block))
359}
360
361pub fn get_latest_finalized_block_address(
363 conn: &Connection,
364) -> Result<Option<ContentAddress>, rusqlite::Error> {
365 conn.query_row(sql::query::GET_LATEST_FINALIZED_BLOCK_ADDRESS, [], |row| {
366 row.get::<_, Hash>("block_address").map(ContentAddress)
367 })
368 .optional()
369}
370
371pub fn get_parent_block_address(
373 conn: &Connection,
374 block_address: &ContentAddress,
375) -> Result<Option<ContentAddress>, rusqlite::Error> {
376 conn.query_row(
377 sql::query::GET_PARENT_BLOCK_ADDRESS,
378 named_params! {
379 ":block_address": block_address.0,
380 },
381 |row| row.get::<_, Hash>("block_address").map(ContentAddress),
382 )
383 .optional()
384}
385
386pub fn get_validation_progress(conn: &Connection) -> Result<Option<ContentAddress>, QueryError> {
388 let mut stmt = conn.prepare(sql::query::GET_VALIDATION_PROGRESS)?;
389 let value: Option<ContentAddress> = stmt
390 .query_row([], |row| {
391 let block_address: Hash = row.get("block_address")?;
392 Ok(ContentAddress(block_address))
393 })
394 .optional()?;
395 Ok(value)
396}
397
398pub fn get_next_block_addresses(
400 conn: &Connection,
401 current_block: &ContentAddress,
402) -> Result<Vec<ContentAddress>, QueryError> {
403 let mut stmt = conn.prepare(sql::query::GET_NEXT_BLOCK_ADDRESSES)?;
404 let rows = stmt.query_map(
405 named_params! {
406 ":current_block": current_block.0,
407 },
408 |row| {
409 let block_address: Hash = row.get("block_address")?;
410 Ok(block_address)
411 },
412 )?;
413 let block_addresses = rows
414 .collect::<Result<Vec<_>, _>>()?
415 .iter()
416 .map(|hash| ContentAddress(*hash))
417 .collect();
418 Ok(block_addresses)
419}
420
421pub fn list_blocks(tx: &Transaction, block_range: Range<Word>) -> Result<Vec<Block>, QueryError> {
423 let mut stmt = tx.prepare(sql::query::LIST_BLOCKS)?;
424 let rows = stmt.query_map(
425 named_params! {
426 ":start_block": block_range.start,
427 ":end_block": block_range.end,
428 },
429 |row| {
430 let block_address: essential_types::Hash = row.get("block_address")?;
431 let block_number: Word = row.get("number")?;
432 let timestamp_secs: u64 = row.get("timestamp_secs")?;
433 let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
434 let solution_set_addr: Hash = row.get("content_addr")?;
435 let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
436 Ok((
437 block_address,
438 block_number,
439 timestamp,
440 ContentAddress(solution_set_addr),
441 ))
442 },
443 )?;
444
445 let mut blocks: Vec<Block> = vec![];
447 let mut last_block_address = None;
448 for res in rows {
449 let (block_address, block_number, timestamp, solution_set_addr): (
450 essential_types::Hash,
451 Word,
452 Duration,
453 ContentAddress,
454 ) = res?;
455
456 let block = match last_block_address {
458 Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
459 _ => {
460 last_block_address = Some(block_address);
461 blocks.push(Block {
462 header: BlockHeader {
463 number: block_number,
464 timestamp,
465 },
466 solution_sets: vec![],
467 });
468 blocks.last_mut().expect("last block must exist")
469 }
470 };
471
472 let solution_set = get_solution_set(tx, &solution_set_addr)?;
476 block.solution_sets.push(solution_set);
477 }
478 Ok(blocks)
479}
480
481pub fn list_blocks_by_time(
483 tx: &Transaction,
484 range: Range<Duration>,
485 page_size: i64,
486 page_number: i64,
487) -> Result<Vec<Block>, QueryError> {
488 let mut stmt = tx.prepare(sql::query::LIST_BLOCKS_BY_TIME)?;
489 let rows = stmt.query_map(
490 named_params! {
491 ":start_secs": range.start.as_secs(),
492 ":start_nanos": range.start.subsec_nanos(),
493 ":end_secs": range.end.as_secs(),
494 ":end_nanos": range.end.subsec_nanos(),
495 ":page_size": page_size,
496 ":page_number": page_number,
497 },
498 |row| {
499 let block_address: essential_types::Hash = row.get("block_address")?;
500 let block_number: Word = row.get("number")?;
501 let timestamp_secs: u64 = row.get("timestamp_secs")?;
502 let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
503 let solution_set_addr: Hash = row.get("content_addr")?;
504 let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
505 Ok((
506 block_address,
507 block_number,
508 timestamp,
509 ContentAddress(solution_set_addr),
510 ))
511 },
512 )?;
513
514 let mut blocks: Vec<Block> = vec![];
516 let mut last_block_address: Option<essential_types::Hash> = None;
517 for res in rows {
518 let (block_address, block_number, timestamp, solution_set_addr): (
519 essential_types::Hash,
520 Word,
521 Duration,
522 ContentAddress,
523 ) = res?;
524
525 let block = match last_block_address {
527 Some(n) if n == block_address => blocks.last_mut().expect("last block must exist"),
528 _ => {
529 last_block_address = Some(block_address);
530 blocks.push(Block {
531 header: BlockHeader {
532 number: block_number,
533 timestamp,
534 },
535 solution_sets: vec![],
536 });
537 blocks.last_mut().expect("last block must exist")
538 }
539 };
540
541 let solution_set = get_solution_set(tx, &solution_set_addr)?;
545 block.solution_sets.push(solution_set);
546 }
547 Ok(blocks)
548}
549
550pub fn list_failed_blocks(
552 conn: &Connection,
553 block_range: Range<Word>,
554) -> Result<Vec<(Word, ContentAddress)>, QueryError> {
555 let mut stmt = conn.prepare(sql::query::LIST_FAILED_BLOCKS)?;
556 let rows = stmt.query_map(
557 named_params! {
558 ":start_block": block_range.start,
559 ":end_block": block_range.end,
560 },
561 |row| {
562 let block_number: Word = row.get("number")?;
563 let solution_set_addr: Hash = row.get("content_addr")?;
564 Ok((block_number, ContentAddress(solution_set_addr)))
565 },
566 )?;
567
568 let mut failed_blocks = vec![];
569 for res in rows {
570 let (block_number, solution_set_addr) = res?;
571 failed_blocks.push((block_number, solution_set_addr));
572 }
573 Ok(failed_blocks)
574}
575
576pub fn list_unchecked_blocks(
578 tx: &Transaction,
579 block_range: Range<Word>,
580) -> Result<Vec<Block>, QueryError> {
581 let mut stmt = tx.prepare(sql::query::LIST_UNCHECKED_BLOCKS)?;
582 let rows = stmt.query_map(
583 named_params! {
584 ":start_block": block_range.start,
585 ":end_block": block_range.end,
586 },
587 |row| {
588 let block_address: essential_types::Hash = row.get("block_address")?;
589 let block_number: Word = row.get("number")?;
590 let timestamp_secs: u64 = row.get("timestamp_secs")?;
591 let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
592 let solution_set_addr: Hash = row.get("content_addr")?;
593 let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
594 Ok((
595 block_address,
596 block_number,
597 timestamp,
598 ContentAddress(solution_set_addr),
599 ))
600 },
601 )?;
602
603 let mut blocks: Vec<Block> = vec![];
605 let mut last_block_address = None;
606 for res in rows {
607 let (block_address, block_number, timestamp, solution_set_addr): (
608 essential_types::Hash,
609 Word,
610 Duration,
611 ContentAddress,
612 ) = res?;
613
614 let block = match last_block_address {
616 Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
617 _ => {
618 last_block_address = Some(block_address);
619 blocks.push(Block {
620 header: BlockHeader {
621 number: block_number,
622 timestamp,
623 },
624 solution_sets: vec![],
625 });
626 blocks.last_mut().expect("last block must exist")
627 }
628 };
629
630 let solution_set = get_solution_set(tx, &solution_set_addr)?;
634 block.solution_sets.push(solution_set);
635 }
636 Ok(blocks)
637}
638
639pub fn subscribe_blocks(
654 start_block: Word,
655 acquire_conn: impl AcquireConnection,
656 await_new_block: impl AwaitNewBlock,
657) -> impl Stream<Item = Result<Block, QueryError>> {
658 fn list_blocks_by_conn(
660 conn: &mut Connection,
661 block_range: Range<Word>,
662 ) -> Result<Vec<Block>, QueryError> {
663 let tx = conn.transaction()?;
664 let blocks = list_blocks(&tx, block_range)?;
665 drop(tx);
666 Ok(blocks)
667 }
668
669 let init = (start_block, acquire_conn, await_new_block);
670 futures::stream::unfold(init, move |(block_ix, acq_conn, mut new_block)| {
671 let next_ix = block_ix + 1;
672 async move {
673 loop {
674 let mut conn = acq_conn.acquire_connection().await?;
676 let res = list_blocks_by_conn(conn.as_mut(), block_ix..next_ix);
677 std::mem::drop(conn);
679 match res {
680 Err(err) => return Some((Err(err), (block_ix, acq_conn, new_block))),
682 Ok(mut vec) => match vec.pop() {
684 None => new_block.await_new_block().await?,
686 Some(block) => return Some((Ok(block), (next_ix, acq_conn, new_block))),
688 },
689 }
690 }
691 }
692 })
693}
694
695pub fn with_tx<T, E>(
698 conn: &mut rusqlite::Connection,
699 f: impl FnOnce(&mut Transaction) -> Result<T, E>,
700) -> Result<T, E>
701where
702 E: From<rusqlite::Error>,
703{
704 let mut tx = conn.transaction()?;
705 let out = f(&mut tx)?;
706 tx.commit()?;
707 Ok(out)
708}
709
710pub fn with_tx_dropped<T, E>(
713 conn: &mut rusqlite::Connection,
714 f: impl FnOnce(&mut Transaction) -> Result<T, E>,
715) -> Result<T, E>
716where
717 E: From<rusqlite::Error>,
718{
719 let mut tx = conn.transaction()?;
720 let out = f(&mut tx)?;
721 drop(tx);
722 Ok(out)
723}
724
725pub fn blob_from_words(words: &[Word]) -> Vec<u8> {
727 words.iter().copied().flat_map(bytes_from_word).collect()
728}
729pub fn words_from_blob(bytes: &[u8]) -> Vec<Word> {
731 bytes
732 .chunks_exact(core::mem::size_of::<Word>())
733 .map(|bytes| word_from_bytes(bytes.try_into().expect("Can't fail due to chunks exact")))
734 .collect()
735}