essential_node_db/
lib.rs

1#![warn(missing_docs)]
2
3//! The node's DB interface and sqlite implementation.
4//!
5//! The core capability of the node is to:
6//!
7//! 1. Receive blocks from an L1 relayer and validate them.
8//! 2. Receive contracts from the p2p network so that they're available for validation.
9//!
10//! As a part of satisfying these requirements, this crate provides the basic
11//! functions required for safely creating the necessary tables and inserting/
12//! querying/updating them as necessary.
13
14pub use error::QueryError;
15use essential_hash::content_addr;
16#[doc(inline)]
17pub use essential_node_db_sql as sql;
18use essential_node_types::{block, Block, BlockHeader};
19use essential_types::{
20    convert::{bytes_from_word, word_from_bytes},
21    solution::{Mutation, Solution, SolutionSet},
22    ContentAddress, Hash, Key, PredicateAddress, Value, Word,
23};
24use futures::Stream;
25#[cfg(feature = "pool")]
26pub use pool::ConnectionPool;
27pub use query_range::address;
28pub use query_range::finalized;
29use rusqlite::{named_params, params, Connection, OptionalExtension, Transaction};
30use std::{ops::Range, time::Duration};
31
32mod error;
33#[cfg(feature = "pool")]
34pub mod pool;
35mod query_range;
36
37/// Types that may be provided to [`subscribe_blocks`] to provide access to
38/// [`Connection`]s while streaming.
39pub trait AcquireConnection {
40    /// Asynchronously acquire a handle to a [`Connection`].
41    ///
42    /// Returns `Some` in the case a connection could be acquired, or `None` in
43    /// the case that the connection source is no longer available.
44    #[allow(async_fn_in_trait)]
45    async fn acquire_connection(&self) -> Option<impl 'static + AsMut<Connection>>;
46}
47
48/// Types that may be provided to [`subscribe_blocks`] to asynchronously await
49/// the availability of a new block.
50pub trait AwaitNewBlock {
51    /// Wait for a new block to become available.
52    ///
53    /// Returns a future that resolves to `Some` when a new block is ready, or
54    /// `None` when the notification source is no longer available.
55    #[allow(async_fn_in_trait)]
56    async fn await_new_block(&mut self) -> Option<()>;
57}
58
59/// Create all tables.
60pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
61    for table in sql::table::ALL {
62        tx.execute(table.create, ())?;
63    }
64    Ok(())
65}
66
67/// For the given block:
68///
69/// 1. Insert an entry into the `block` table.
70/// 2. Insert each of its solution sets into the `solution_set` and `block_solution_set` tables.
71///
72/// Returns the `ContentAddress` of the inserted block.
73pub fn insert_block(tx: &Transaction, block: &Block) -> rusqlite::Result<ContentAddress> {
74    // Insert the header.
75    let secs = block.header.timestamp.as_secs();
76    let nanos = block.header.timestamp.subsec_nanos() as u64;
77    let solution_set_addrs: Vec<ContentAddress> =
78        block.solution_sets.iter().map(content_addr).collect();
79    let block_address =
80        block::addr::from_header_and_solution_set_addrs_slice(&block.header, &solution_set_addrs);
81
82    // TODO: Use real parent block address once blocks have parent addresses.
83    let parent_block_address = ContentAddress([0; 32]);
84
85    tx.execute(
86        sql::insert::BLOCK,
87        named_params! {
88            ":block_address": block_address.0,
89            ":parent_block_address": parent_block_address.0,
90            ":number": block.header.number,
91            ":timestamp_secs": secs,
92            ":timestamp_nanos": nanos,
93        },
94    )?;
95
96    // Insert all solution sets.
97    let mut stmt_solution_set = tx.prepare(sql::insert::SOLUTION_SET)?;
98    let mut stmt_block_solution_set = tx.prepare(sql::insert::BLOCK_SOLUTION_SET)?;
99    let mut stmt_solution = tx.prepare(sql::insert::SOLUTION)?;
100    let mut stmt_mutation = tx.prepare(sql::insert::MUTATION)?;
101    let mut stmt_pred_data = tx.prepare(sql::insert::PRED_DATA)?;
102
103    for (ix, (solution_set, ca)) in block
104        .solution_sets
105        .iter()
106        .zip(solution_set_addrs)
107        .enumerate()
108    {
109        // Insert the solution set.
110        stmt_solution_set.execute(named_params! {
111            ":content_addr": ca.0,
112        })?;
113
114        // Create a mapping between the block and the solution set.
115        stmt_block_solution_set.execute(named_params! {
116            ":block_address": block_address.0,
117            ":solution_set_addr": &ca.0,
118            ":solution_set_index": ix,
119        })?;
120
121        // Insert solutions.
122        for (solution_ix, solution) in solution_set.solutions.iter().enumerate() {
123            stmt_solution.execute(named_params! {
124                ":solution_set_addr": ca.0,
125                ":solution_index": solution_ix,
126                ":contract_addr": solution.predicate_to_solve.contract.0,
127                ":predicate_addr": solution.predicate_to_solve.predicate.0,
128            })?;
129            for (mutation_ix, mutation) in solution.state_mutations.iter().enumerate() {
130                stmt_mutation.execute(named_params! {
131                    ":solution_set_addr": ca.0,
132                    ":solution_index": solution_ix,
133                    ":mutation_index": mutation_ix,
134                    ":key": blob_from_words(&mutation.key),
135                    ":value": blob_from_words(&mutation.value),
136                })?;
137            }
138            for (pred_data_ix, pred_data) in solution.predicate_data.iter().enumerate() {
139                stmt_pred_data.execute(named_params! {
140                    ":solution_set_addr": ca.0,
141                    ":solution_index": solution_ix,
142                    ":pred_data_index": pred_data_ix,
143                    ":value": blob_from_words(pred_data)
144                })?;
145            }
146        }
147    }
148    stmt_solution_set.finalize()?;
149    stmt_block_solution_set.finalize()?;
150    stmt_solution.finalize()?;
151    stmt_mutation.finalize()?;
152    stmt_pred_data.finalize()?;
153
154    Ok(block_address)
155}
156
157/// Finalizes the block with the given hash.
158/// This sets the block to be the only block at a particular block number.
159pub fn finalize_block(conn: &Connection, block_address: &ContentAddress) -> rusqlite::Result<()> {
160    conn.execute(
161        sql::insert::FINALIZE_BLOCK,
162        named_params! {
163            ":block_address": block_address.0,
164        },
165    )?;
166    Ok(())
167}
168
169/// Inserts a failed block.
170pub fn insert_failed_block(
171    conn: &Connection,
172    block_address: &ContentAddress,
173    solution_set_addr: &ContentAddress,
174) -> rusqlite::Result<()> {
175    conn.execute(
176        sql::insert::FAILED_BLOCK,
177        named_params! {
178            ":block_address": block_address.0,
179            ":solution_set_addr": solution_set_addr.0,
180        },
181    )?;
182    Ok(())
183}
184
185/// Updates the state for a given contract content address and key.
186pub fn update_state(
187    conn: &Connection,
188    contract_ca: &ContentAddress,
189    key: &Key,
190    value: &Value,
191) -> rusqlite::Result<()> {
192    conn.execute(
193        sql::update::STATE,
194        named_params! {
195            ":contract_ca": contract_ca.0,
196            ":key": blob_from_words(key),
197            ":value": blob_from_words(value),
198        },
199    )?;
200    Ok(())
201}
202
203/// Updates the progress on validation.
204pub fn update_validation_progress(
205    conn: &Connection,
206    block_address: &ContentAddress,
207) -> rusqlite::Result<()> {
208    conn.execute(
209        sql::insert::VALIDATION_PROGRESS,
210        named_params! {
211            ":block_address": block_address.0,
212        },
213    )?;
214    Ok(())
215}
216
217/// Deletes the state for a given contract content address and key.
218pub fn delete_state(
219    conn: &Connection,
220    contract_ca: &ContentAddress,
221    key: &Key,
222) -> rusqlite::Result<()> {
223    conn.execute(
224        sql::update::DELETE_STATE,
225        named_params! {
226            ":contract_ca": contract_ca.0,
227            ":key": blob_from_words(key),
228        },
229    )?;
230    Ok(())
231}
232
233/// Fetches a solution set by its content address.
234pub fn get_solution_set(tx: &Transaction, ca: &ContentAddress) -> Result<SolutionSet, QueryError> {
235    let mut solution_stmt = tx.prepare(sql::query::GET_SOLUTION)?;
236    let mut solutions = solution_stmt
237        .query_map([ca.0], |row| {
238            let contract_addr = row.get::<_, Hash>("contract_addr")?;
239            let predicate_addr = row.get::<_, Hash>("predicate_addr")?;
240            Ok(Solution {
241                predicate_to_solve: PredicateAddress {
242                    contract: ContentAddress(contract_addr),
243                    predicate: ContentAddress(predicate_addr),
244                },
245                state_mutations: vec![],
246                predicate_data: vec![],
247            })
248        })?
249        .collect::<Result<Vec<_>, _>>()?;
250    solution_stmt.finalize()?;
251
252    let mut pred_data_stmt = tx.prepare(sql::query::GET_SOLUTION_PRED_DATA)?;
253    let mut mutations_stmt = tx.prepare(sql::query::GET_SOLUTION_MUTATIONS)?;
254
255    for (solution_ix, solution) in solutions.iter_mut().enumerate() {
256        // Fetch the mutations.
257        let mut mutation_rows = mutations_stmt.query(named_params! {
258            ":content_addr": ca.0,
259            ":solution_index": solution_ix,
260        })?;
261        while let Some(mutation_row) = mutation_rows.next()? {
262            let key_blob: Vec<u8> = mutation_row.get("key")?;
263            let value_blob: Vec<u8> = mutation_row.get("value")?;
264            let key: Key = words_from_blob(&key_blob);
265            let value: Value = words_from_blob(&value_blob);
266            solution.state_mutations.push(Mutation { key, value });
267        }
268
269        // Fetch the predicate data.
270        let mut pred_data_rows = pred_data_stmt.query(named_params! {
271            ":content_addr": ca.0,
272            ":solution_index": solution_ix,
273        })?;
274        while let Some(pred_data_row) = pred_data_rows.next()? {
275            let value_blob: Vec<u8> = pred_data_row.get("value")?;
276            let value: Value = words_from_blob(&value_blob);
277            solution.predicate_data.push(value);
278        }
279    }
280
281    mutations_stmt.finalize()?;
282    pred_data_stmt.finalize()?;
283
284    Ok(SolutionSet { solutions })
285}
286
287/// Fetches the state value for the given contract content address and key pair.
288pub fn query_state(
289    conn: &Connection,
290    contract_ca: &ContentAddress,
291    key: &Key,
292) -> Result<Option<Value>, QueryError> {
293    use rusqlite::OptionalExtension;
294    let mut stmt = conn.prepare(sql::query::GET_STATE)?;
295    let value_blob: Option<Vec<u8>> = stmt
296        .query_row(params![contract_ca.0, blob_from_words(key)], |row| {
297            row.get("value")
298        })
299        .optional()?;
300    Ok(value_blob.as_deref().map(words_from_blob))
301}
302
303/// Given a block address, returns the header for that block.
304///
305/// Returns the block number and block timestamp in that order.
306pub fn get_block_header(
307    conn: &Connection,
308    block_address: &ContentAddress,
309) -> rusqlite::Result<Option<BlockHeader>> {
310    conn.query_row(
311        sql::query::GET_BLOCK_HEADER,
312        named_params! {
313            ":block_address": block_address.0,
314        },
315        |row| {
316            let number: Word = row.get("number")?;
317            let timestamp_secs: u64 = row.get("timestamp_secs")?;
318            let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
319            let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
320            Ok(BlockHeader { number, timestamp })
321        },
322    )
323    .optional()
324}
325
326/// Returns the block with given address.
327pub fn get_block(
328    tx: &Transaction,
329    block_address: &ContentAddress,
330) -> Result<Option<Block>, QueryError> {
331    let Some(header) = get_block_header(tx, block_address)? else {
332        return Ok(None);
333    };
334    let mut stmt = tx.prepare(sql::query::GET_BLOCK)?;
335    let rows = stmt.query_map(
336        named_params! {
337            ":block_address": block_address.0,
338        },
339        |row| {
340            let solution_addr: Hash = row.get("content_addr")?;
341            Ok(ContentAddress(solution_addr))
342        },
343    )?;
344
345    let mut block = Block {
346        header,
347        solution_sets: vec![],
348    };
349    for res in rows {
350        let solution_set_addr: ContentAddress = res?;
351
352        // Add the solution set.
353        // If there are performance issues, use statements in `get_solution_set` directly.
354        // See https://github.com/essential-contributions/essential-node/issues/154.
355        let solution_set = get_solution_set(tx, &solution_set_addr)?;
356        block.solution_sets.push(solution_set);
357    }
358    Ok(Some(block))
359}
360
361/// Fetches the latest finalized block hash.
362pub fn get_latest_finalized_block_address(
363    conn: &Connection,
364) -> Result<Option<ContentAddress>, rusqlite::Error> {
365    conn.query_row(sql::query::GET_LATEST_FINALIZED_BLOCK_ADDRESS, [], |row| {
366        row.get::<_, Hash>("block_address").map(ContentAddress)
367    })
368    .optional()
369}
370
371/// Fetches the parent block address.
372pub fn get_parent_block_address(
373    conn: &Connection,
374    block_address: &ContentAddress,
375) -> Result<Option<ContentAddress>, rusqlite::Error> {
376    conn.query_row(
377        sql::query::GET_PARENT_BLOCK_ADDRESS,
378        named_params! {
379            ":block_address": block_address.0,
380        },
381        |row| row.get::<_, Hash>("block_address").map(ContentAddress),
382    )
383    .optional()
384}
385
386/// Fetches the last progress on validation.
387pub fn get_validation_progress(conn: &Connection) -> Result<Option<ContentAddress>, QueryError> {
388    let mut stmt = conn.prepare(sql::query::GET_VALIDATION_PROGRESS)?;
389    let value: Option<ContentAddress> = stmt
390        .query_row([], |row| {
391            let block_address: Hash = row.get("block_address")?;
392            Ok(ContentAddress(block_address))
393        })
394        .optional()?;
395    Ok(value)
396}
397
398/// Given a block address, returns the addresses of blocks that have the next block number.
399pub fn get_next_block_addresses(
400    conn: &Connection,
401    current_block: &ContentAddress,
402) -> Result<Vec<ContentAddress>, QueryError> {
403    let mut stmt = conn.prepare(sql::query::GET_NEXT_BLOCK_ADDRESSES)?;
404    let rows = stmt.query_map(
405        named_params! {
406            ":current_block": current_block.0,
407        },
408        |row| {
409            let block_address: Hash = row.get("block_address")?;
410            Ok(block_address)
411        },
412    )?;
413    let block_addresses = rows
414        .collect::<Result<Vec<_>, _>>()?
415        .iter()
416        .map(|hash| ContentAddress(*hash))
417        .collect();
418    Ok(block_addresses)
419}
420
421/// Lists all blocks in the given range.
422pub fn list_blocks(tx: &Transaction, block_range: Range<Word>) -> Result<Vec<Block>, QueryError> {
423    let mut stmt = tx.prepare(sql::query::LIST_BLOCKS)?;
424    let rows = stmt.query_map(
425        named_params! {
426            ":start_block": block_range.start,
427            ":end_block": block_range.end,
428        },
429        |row| {
430            let block_address: essential_types::Hash = row.get("block_address")?;
431            let block_number: Word = row.get("number")?;
432            let timestamp_secs: u64 = row.get("timestamp_secs")?;
433            let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
434            let solution_set_addr: Hash = row.get("content_addr")?;
435            let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
436            Ok((
437                block_address,
438                block_number,
439                timestamp,
440                ContentAddress(solution_set_addr),
441            ))
442        },
443    )?;
444
445    // Query yields in order of block number and solution set index.
446    let mut blocks: Vec<Block> = vec![];
447    let mut last_block_address = None;
448    for res in rows {
449        let (block_address, block_number, timestamp, solution_set_addr): (
450            essential_types::Hash,
451            Word,
452            Duration,
453            ContentAddress,
454        ) = res?;
455
456        // Fetch the block associated with the block number, inserting it first if new.
457        let block = match last_block_address {
458            Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
459            _ => {
460                last_block_address = Some(block_address);
461                blocks.push(Block {
462                    header: BlockHeader {
463                        number: block_number,
464                        timestamp,
465                    },
466                    solution_sets: vec![],
467                });
468                blocks.last_mut().expect("last block must exist")
469            }
470        };
471
472        // Add the solution set.
473        // If there are performance issues, use statements in `get_solution_set` directly.
474        // See https://github.com/essential-contributions/essential-node/issues/154.
475        let solution_set = get_solution_set(tx, &solution_set_addr)?;
476        block.solution_sets.push(solution_set);
477    }
478    Ok(blocks)
479}
480
481/// Lists blocks and their solution sets within a specific time range with pagination.
482pub fn list_blocks_by_time(
483    tx: &Transaction,
484    range: Range<Duration>,
485    page_size: i64,
486    page_number: i64,
487) -> Result<Vec<Block>, QueryError> {
488    let mut stmt = tx.prepare(sql::query::LIST_BLOCKS_BY_TIME)?;
489    let rows = stmt.query_map(
490        named_params! {
491            ":start_secs": range.start.as_secs(),
492            ":start_nanos": range.start.subsec_nanos(),
493            ":end_secs": range.end.as_secs(),
494            ":end_nanos": range.end.subsec_nanos(),
495            ":page_size": page_size,
496            ":page_number": page_number,
497        },
498        |row| {
499            let block_address: essential_types::Hash = row.get("block_address")?;
500            let block_number: Word = row.get("number")?;
501            let timestamp_secs: u64 = row.get("timestamp_secs")?;
502            let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
503            let solution_set_addr: Hash = row.get("content_addr")?;
504            let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
505            Ok((
506                block_address,
507                block_number,
508                timestamp,
509                ContentAddress(solution_set_addr),
510            ))
511        },
512    )?;
513
514    // Query yields in order of block number and solution set index.
515    let mut blocks: Vec<Block> = vec![];
516    let mut last_block_address: Option<essential_types::Hash> = None;
517    for res in rows {
518        let (block_address, block_number, timestamp, solution_set_addr): (
519            essential_types::Hash,
520            Word,
521            Duration,
522            ContentAddress,
523        ) = res?;
524
525        // Fetch the block associated with the block number, inserting it first if new.
526        let block = match last_block_address {
527            Some(n) if n == block_address => blocks.last_mut().expect("last block must exist"),
528            _ => {
529                last_block_address = Some(block_address);
530                blocks.push(Block {
531                    header: BlockHeader {
532                        number: block_number,
533                        timestamp,
534                    },
535                    solution_sets: vec![],
536                });
537                blocks.last_mut().expect("last block must exist")
538            }
539        };
540
541        // Add the solution set.
542        // If there are performance issues, use statements in `get_solution_set` directly.
543        // See https://github.com/essential-contributions/essential-node/issues/154.
544        let solution_set = get_solution_set(tx, &solution_set_addr)?;
545        block.solution_sets.push(solution_set);
546    }
547    Ok(blocks)
548}
549
550/// List failed blocks as (block number, solution set hash) within a given range.
551pub fn list_failed_blocks(
552    conn: &Connection,
553    block_range: Range<Word>,
554) -> Result<Vec<(Word, ContentAddress)>, QueryError> {
555    let mut stmt = conn.prepare(sql::query::LIST_FAILED_BLOCKS)?;
556    let rows = stmt.query_map(
557        named_params! {
558            ":start_block": block_range.start,
559            ":end_block": block_range.end,
560        },
561        |row| {
562            let block_number: Word = row.get("number")?;
563            let solution_set_addr: Hash = row.get("content_addr")?;
564            Ok((block_number, ContentAddress(solution_set_addr)))
565        },
566    )?;
567
568    let mut failed_blocks = vec![];
569    for res in rows {
570        let (block_number, solution_set_addr) = res?;
571        failed_blocks.push((block_number, solution_set_addr));
572    }
573    Ok(failed_blocks)
574}
575
576/// Lists all unchecked blocks in the given range.
577pub fn list_unchecked_blocks(
578    tx: &Transaction,
579    block_range: Range<Word>,
580) -> Result<Vec<Block>, QueryError> {
581    let mut stmt = tx.prepare(sql::query::LIST_UNCHECKED_BLOCKS)?;
582    let rows = stmt.query_map(
583        named_params! {
584            ":start_block": block_range.start,
585            ":end_block": block_range.end,
586        },
587        |row| {
588            let block_address: essential_types::Hash = row.get("block_address")?;
589            let block_number: Word = row.get("number")?;
590            let timestamp_secs: u64 = row.get("timestamp_secs")?;
591            let timestamp_nanos: u32 = row.get("timestamp_nanos")?;
592            let solution_set_addr: Hash = row.get("content_addr")?;
593            let timestamp = Duration::new(timestamp_secs, timestamp_nanos);
594            Ok((
595                block_address,
596                block_number,
597                timestamp,
598                ContentAddress(solution_set_addr),
599            ))
600        },
601    )?;
602
603    // Query yields in order of block number and solution set index.
604    let mut blocks: Vec<Block> = vec![];
605    let mut last_block_address = None;
606    for res in rows {
607        let (block_address, block_number, timestamp, solution_set_addr): (
608            essential_types::Hash,
609            Word,
610            Duration,
611            ContentAddress,
612        ) = res?;
613
614        // Fetch the block associated with the block number, inserting it first if new.
615        let block = match last_block_address {
616            Some(b) if b == block_address => blocks.last_mut().expect("last block must exist"),
617            _ => {
618                last_block_address = Some(block_address);
619                blocks.push(Block {
620                    header: BlockHeader {
621                        number: block_number,
622                        timestamp,
623                    },
624                    solution_sets: vec![],
625                });
626                blocks.last_mut().expect("last block must exist")
627            }
628        };
629
630        // Add the solution set.
631        // If there are performance issues, use statements in `get_solution_set` directly.
632        // See https://github.com/essential-contributions/essential-node/issues/154.
633        let solution_set = get_solution_set(tx, &solution_set_addr)?;
634        block.solution_sets.push(solution_set);
635    }
636    Ok(blocks)
637}
638
639/// Subscribe to all blocks from the given starting block number.
640///
641/// The given `acquire_conn` type will be used on each iteration to
642/// asynchronously acquire a handle to a new rusqlite `Connection` from a source
643/// such as a connection pool. If the returned future completes with `None`, it
644/// is assumed the source of `Connection`s has closed, and in turn the `Stream`
645/// will close.
646///
647/// The given `await_new_block` type will be used as a signal to check whether
648/// or not a new block is available within the DB. The returned stream will
649/// yield immediately for each block until a DB query indicates there are no
650/// more blocks, at which point `await_new_block` is called before continuing.
651/// If `await_new_block` returns `None`, the source of new block notifications
652/// is assumed to have been closed and the stream will close.
653pub fn subscribe_blocks(
654    start_block: Word,
655    acquire_conn: impl AcquireConnection,
656    await_new_block: impl AwaitNewBlock,
657) -> impl Stream<Item = Result<Block, QueryError>> {
658    // Helper function to list blocks by block number range.
659    fn list_blocks_by_conn(
660        conn: &mut Connection,
661        block_range: Range<Word>,
662    ) -> Result<Vec<Block>, QueryError> {
663        let tx = conn.transaction()?;
664        let blocks = list_blocks(&tx, block_range)?;
665        drop(tx);
666        Ok(blocks)
667    }
668
669    let init = (start_block, acquire_conn, await_new_block);
670    futures::stream::unfold(init, move |(block_ix, acq_conn, mut new_block)| {
671        let next_ix = block_ix + 1;
672        async move {
673            loop {
674                // Acquire a connection and query for the current block.
675                let mut conn = acq_conn.acquire_connection().await?;
676                let res = list_blocks_by_conn(conn.as_mut(), block_ix..next_ix);
677                // Drop the connection ASAP in case it needs returning to a pool.
678                std::mem::drop(conn);
679                match res {
680                    // If some error occurred, emit the error.
681                    Err(err) => return Some((Err(err), (block_ix, acq_conn, new_block))),
682                    // If the query succeeded, pop the single block.
683                    Ok(mut vec) => match vec.pop() {
684                        // If there were no matching blocks, await the next.
685                        None => new_block.await_new_block().await?,
686                        // If we have the block, emit it.
687                        Some(block) => return Some((Ok(block), (next_ix, acq_conn, new_block))),
688                    },
689                }
690            }
691        }
692    })
693}
694
695/// Short-hand for constructing a transaction, providing it as an argument to
696/// the given function, then committing the transaction before returning.
697pub fn with_tx<T, E>(
698    conn: &mut rusqlite::Connection,
699    f: impl FnOnce(&mut Transaction) -> Result<T, E>,
700) -> Result<T, E>
701where
702    E: From<rusqlite::Error>,
703{
704    let mut tx = conn.transaction()?;
705    let out = f(&mut tx)?;
706    tx.commit()?;
707    Ok(out)
708}
709
710/// Short-hand for constructing a transaction, providing it as an argument to
711/// the given function, then dropping the transaction before returning.
712pub fn with_tx_dropped<T, E>(
713    conn: &mut rusqlite::Connection,
714    f: impl FnOnce(&mut Transaction) -> Result<T, E>,
715) -> Result<T, E>
716where
717    E: From<rusqlite::Error>,
718{
719    let mut tx = conn.transaction()?;
720    let out = f(&mut tx)?;
721    drop(tx);
722    Ok(out)
723}
724
725/// Convert a slice of `Word`s into a blob.
726pub fn blob_from_words(words: &[Word]) -> Vec<u8> {
727    words.iter().copied().flat_map(bytes_from_word).collect()
728}
729/// Convert a blob into a vector of `Word`s.
730pub fn words_from_blob(bytes: &[u8]) -> Vec<Word> {
731    bytes
732        .chunks_exact(core::mem::size_of::<Word>())
733        .map(|bytes| word_from_bytes(bytes.try_into().expect("Can't fail due to chunks exact")))
734        .collect()
735}