1use error::{DecodeError, QueryError};
22use essential_builder_types::SolutionSetFailure;
23use essential_hash::content_addr;
24use essential_types::{solution::SolutionSet, ContentAddress, Hash};
25#[cfg(feature = "pool")]
26pub use pool::ConnectionPool;
27use rusqlite::{named_params, Connection, OptionalExtension, Transaction};
28use serde::{Deserialize, Serialize};
29use std::{ops::Range, time::Duration};
30
31pub mod error;
32#[cfg(feature = "pool")]
33pub mod pool;
34pub mod sql;
35
36pub fn encode<T>(value: &T) -> Vec<u8>
40where
41 T: Serialize,
42{
43 postcard::to_allocvec(value).expect("postcard serialization cannot fail")
44}
45
46pub fn decode<T>(value: &[u8]) -> Result<T, DecodeError>
50where
51 T: for<'de> Deserialize<'de>,
52{
53 Ok(postcard::from_bytes(value)?)
54}
55
56pub fn create_tables(tx: &Transaction) -> rusqlite::Result<()> {
58 for table in sql::table::ALL {
59 tx.execute(table.create, ())?;
60 }
61 Ok(())
62}
63
64pub fn insert_solution_set_submission(
69 tx: &Transaction,
70 solution_set: &SolutionSet,
71 timestamp: Duration,
72) -> rusqlite::Result<ContentAddress> {
73 let ca = content_addr(solution_set);
75 let ca_blob = &ca.0;
76 let solution_set_blob = encode(solution_set);
77 tx.execute(
78 sql::insert::SOLUTION_SET,
79 named_params! {
80 ":content_addr": &ca_blob,
81 ":solution_set": solution_set_blob,
82 },
83 )?;
84
85 let secs = timestamp.as_secs();
87 let nanos = timestamp.subsec_nanos();
88 tx.execute(
89 sql::insert::SUBMISSION,
90 named_params! {
91 ":solution_set_addr": ca_blob,
92 ":timestamp_secs": secs,
93 ":timestamp_nanos": nanos,
94 },
95 )?;
96
97 Ok(ca)
98}
99
100pub fn insert_solution_set_failure(
107 conn: &Connection,
108 solution_set_ca: &ContentAddress,
109 solution_set_failure: SolutionSetFailure,
110) -> rusqlite::Result<()> {
111 conn.execute(
112 sql::insert::SOLUTION_SET_FAILURE,
113 named_params! {
114 ":solution_set_addr": &solution_set_ca.0,
115 ":attempt_block_num": solution_set_failure.attempt_block_num,
116 ":attempt_block_addr": &solution_set_failure.attempt_block_addr.0,
117 ":attempt_solution_set_ix": solution_set_failure.attempt_solution_set_ix,
118 ":err_msg": solution_set_failure.err_msg.as_bytes(),
119 },
120 )?;
121 Ok(())
122}
123
124pub fn get_solution_set(
126 conn: &Connection,
127 ca: &ContentAddress,
128) -> Result<Option<SolutionSet>, QueryError> {
129 let ca_blob = &ca.0;
130 let mut stmt = conn.prepare(sql::query::GET_SOLUTION_SET)?;
131 let solution_set_blob: Option<Vec<u8>> = stmt
132 .query_row([ca_blob], |row| row.get("solution_set"))
133 .optional()?;
134 Ok(solution_set_blob.as_deref().map(decode).transpose()?)
135}
136
137pub fn list_solution_sets(
144 conn: &Connection,
145 time_range: Range<Duration>,
146 limit: i64,
147) -> Result<Vec<(ContentAddress, SolutionSet, Duration)>, QueryError> {
148 let mut stmt = conn.prepare(sql::query::LIST_SOLUTION_SETS)?;
149 let start_secs = time_range.start.as_secs();
150 let start_nanos = time_range.start.subsec_nanos();
151 let end_secs = time_range.end.as_secs();
152 let end_nanos = time_range.end.subsec_nanos();
153 let rows = stmt.query_map(
154 named_params! {
155 ":start_secs": start_secs,
156 ":start_nanos": start_nanos,
157 ":end_secs": end_secs,
158 ":end_nanos": end_nanos,
159 ":limit": limit,
160 },
161 |row| {
162 let solution_set_addr_blob: Hash = row.get("content_addr")?;
163 let solution_set_blob: Vec<u8> = row.get("solution_set")?;
164 let secs: u64 = row.get("timestamp_secs")?;
165 let nanos: u32 = row.get("timestamp_nanos")?;
166 let timestamp = Duration::new(secs, nanos);
167 Ok((
168 ContentAddress(solution_set_addr_blob),
169 solution_set_blob,
170 timestamp,
171 ))
172 },
173 )?;
174 rows.into_iter()
175 .map(|res| {
176 let (ca, blob, ts) = res?;
177 let solution_set = decode(&blob)?;
178 Ok((ca, solution_set, ts))
179 })
180 .collect()
181}
182
183pub fn list_submissions(
187 conn: &Connection,
188 time_range: Range<Duration>,
189 limit: i64,
190) -> rusqlite::Result<Vec<(ContentAddress, Duration)>> {
191 let mut stmt = conn.prepare(sql::query::LIST_SUBMISSIONS)?;
192 let start_secs = time_range.start.as_secs();
193 let start_nanos = time_range.start.subsec_nanos();
194 let end_secs = time_range.end.as_secs();
195 let end_nanos = time_range.end.subsec_nanos();
196 let rows = stmt.query_map(
197 named_params! {
198 ":start_secs": start_secs,
199 ":start_nanos": start_nanos,
200 ":end_secs": end_secs,
201 ":end_nanos": end_nanos,
202 ":limit": limit,
203 },
204 |row| {
205 let solution_set_addr_blob: Hash = row.get("content_addr")?;
206 let secs: u64 = row.get("timestamp_secs")?;
207 let nanos: u32 = row.get("timestamp_nanos")?;
208 let timestamp = Duration::new(secs, nanos);
209 Ok((ContentAddress(solution_set_addr_blob), timestamp))
210 },
211 )?;
212 rows.collect()
213}
214
215pub fn latest_solution_set_failures(
221 conn: &Connection,
222 ca: &ContentAddress,
223 limit: u32,
224) -> rusqlite::Result<Vec<SolutionSetFailure<'static>>> {
225 let ca_blob = &ca.0;
226 let mut stmt = conn.prepare(sql::query::LATEST_SOLUTION_SET_FAILURES)?;
227 let rows = stmt.query_map(
228 named_params! {
229 ":solution_set_addr": ca_blob,
230 ":limit": limit,
231 },
232 |row| {
233 let attempt_block_num: i64 = row.get("attempt_block_num")?;
234 let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
235 let attempt_solution_set_ix: u32 = row.get("attempt_solution_set_ix")?;
236 let err_msg_blob: Vec<u8> = row.get("err_msg")?;
237 let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
238 Ok(SolutionSetFailure {
239 attempt_block_num,
240 attempt_block_addr: ContentAddress(attempt_block_addr),
241 attempt_solution_set_ix,
242 err_msg: err_msg.into(),
243 })
244 },
245 )?;
246 rows.collect()
247}
248
249pub fn list_solution_set_failures(
255 conn: &Connection,
256 offset: u32,
257 limit: u32,
258) -> rusqlite::Result<Vec<SolutionSetFailure<'static>>> {
259 let mut stmt = conn.prepare(sql::query::LIST_SOLUTION_SET_FAILURES)?;
260 let rows = stmt.query_map(
261 named_params! {
262 ":offset": offset,
263 ":limit": limit,
264 },
265 |row| {
266 let attempt_block_num: i64 = row.get("attempt_block_num")?;
267 let attempt_block_addr: Hash = row.get("attempt_block_addr")?;
268 let attempt_solution_set_ix: u32 = row.get("attempt_solution_set_ix")?;
269 let err_msg_blob: Vec<u8> = row.get("err_msg")?;
270 let err_msg = String::from_utf8_lossy(&err_msg_blob).into_owned();
271 Ok(SolutionSetFailure {
272 attempt_block_num,
273 attempt_block_addr: ContentAddress(attempt_block_addr),
274 attempt_solution_set_ix,
275 err_msg: err_msg.into(),
276 })
277 },
278 )?;
279 rows.collect()
280}
281
282pub fn delete_solution_set(conn: &Connection, ca: &ContentAddress) -> rusqlite::Result<()> {
286 let ca_blob = &ca.0;
287 conn.execute(
288 sql::delete::SOLUTION_SET,
289 named_params! {
290 ":content_addr": ca_blob,
291 },
292 )?;
293 Ok(())
294}
295
296pub fn delete_solution_sets(
300 tx: &Transaction,
301 cas: impl IntoIterator<Item = ContentAddress>,
302) -> rusqlite::Result<()> {
303 for ca in cas {
304 crate::delete_solution_set(tx, &ca)?;
305 }
306 Ok(())
307}
308
309pub fn delete_oldest_solution_set_failures(
312 conn: &Connection,
313 keep_limit: u32,
314) -> rusqlite::Result<()> {
315 conn.execute(
316 sql::delete::OLDEST_SOLUTION_SET_FAILURES,
317 named_params! {
318 ":keep_limit": keep_limit,
319 },
320 )?;
321 Ok(())
322}
323
324pub fn with_tx<T, E>(
327 conn: &mut rusqlite::Connection,
328 f: impl FnOnce(&mut Transaction) -> Result<T, E>,
329) -> Result<T, E>
330where
331 E: From<rusqlite::Error>,
332{
333 let mut tx = conn.transaction()?;
334 let out = f(&mut tx)?;
335 tx.commit()?;
336 Ok(out)
337}