1use std::{
2 collections::{hash_map::DefaultHasher, BTreeMap},
3 hash::{Hash, Hasher},
4 sync::{Arc, RwLock},
5 time::Duration,
6};
7
8use ankurah_core::{
9 error::{MutationError, RetrievalError, StateError},
10 property::backend::backend_from_string,
11 storage::{StorageCollection, StorageEngine},
12};
13use ankurah_proto::{Attestation, AttestationSet, Attested, EntityState, EventId, OperationSet, State, StateBuffers};
14
15use futures_util::{pin_mut, TryStreamExt};
16
17pub mod sql_builder;
18pub mod value;
19
20use value::PGValue;
21
22use ankurah_proto::{Clock, CollectionId, EntityId, Event};
23use async_trait::async_trait;
24use bb8_postgres::{tokio_postgres::NoTls, PostgresConnectionManager};
25use tokio_postgres::{error::SqlState, types::ToSql};
26use tracing::{debug, error, info, warn};
27
28pub const DEFAULT_POOL_SIZE: u32 = 15;
31
32pub const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
34
35pub struct Postgres {
36 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
37}
38
39impl Postgres {
40 pub fn new(pool: bb8::Pool<PostgresConnectionManager<NoTls>>) -> anyhow::Result<Self> { Ok(Self { pool }) }
41
42 pub async fn open(uri: &str) -> anyhow::Result<Self> {
43 let manager = PostgresConnectionManager::new_from_stringlike(uri, NoTls)?;
44 let pool = bb8::Pool::builder()
45 .max_size(DEFAULT_POOL_SIZE)
46 .connection_timeout(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS))
47 .build(manager)
48 .await?;
49 Self::new(pool)
50 }
51
52 pub fn sane_name(collection: &str) -> bool {
55 for char in collection.chars() {
56 match char {
57 char if char.is_alphanumeric() => {}
58 char if char.is_numeric() => {}
59 '_' | '.' | ':' => {}
60 _ => return false,
61 }
62 }
63
64 true
65 }
66}
67
68fn advisory_lock_key(identifier: &str) -> i64 {
70 let mut hasher = DefaultHasher::new();
71 identifier.hash(&mut hasher);
72 hasher.finish() as i64
73}
74
75async fn acquire_ddl_lock(client: &tokio_postgres::Client, collection_id: &str) -> Result<i64, StateError> {
77 let lock_key = advisory_lock_key(&format!("ankurah_ddl:{}", collection_id));
78 debug!("Acquiring advisory lock {} for collection {}", lock_key, collection_id);
79 client.execute("SELECT pg_advisory_lock($1)", &[&lock_key]).await.map_err(|err| {
80 error!("Failed to acquire advisory lock for {}: {:?}", collection_id, err);
81 StateError::DDLError(Box::new(err))
82 })?;
83 Ok(lock_key)
84}
85
86async fn release_ddl_lock(client: &tokio_postgres::Client, lock_key: i64) -> Result<(), StateError> {
88 debug!("Releasing advisory lock {}", lock_key);
89 client.execute("SELECT pg_advisory_unlock($1)", &[&lock_key]).await.map_err(|err| {
90 error!("Failed to release advisory lock {}: {:?}", lock_key, err);
91 StateError::DDLError(Box::new(err))
92 })?;
93 Ok(())
94}
95
96#[async_trait]
97impl StorageEngine for Postgres {
98 type Value = PGValue;
99
100 async fn collection(&self, collection_id: &CollectionId) -> Result<std::sync::Arc<dyn StorageCollection>, RetrievalError> {
101 if !Postgres::sane_name(collection_id.as_str()) {
102 return Err(RetrievalError::InvalidBucketName);
103 }
104
105 let mut client = self.pool.get().await.map_err(RetrievalError::storage)?;
106
107 let schema = client.query_one("SELECT current_database()", &[]).await.map_err(RetrievalError::storage)?;
109 let schema = schema.get("current_database");
110
111 let bucket = PostgresBucket {
112 pool: self.pool.clone(),
113 schema,
114 collection_id: collection_id.clone(),
115 columns: Arc::new(RwLock::new(Vec::new())),
116 };
117
118 let lock_key = acquire_ddl_lock(&client, collection_id.as_str()).await?;
120
121 let result = async {
123 bucket.create_state_table(&mut client).await?;
124 bucket.create_event_table(&mut client).await?;
125 bucket.rebuild_columns_cache(&mut client).await?;
126 Ok::<_, StateError>(())
127 }
128 .await;
129
130 release_ddl_lock(&client, lock_key).await?;
132
133 result?;
134 Ok(Arc::new(bucket))
135 }
136
137 async fn delete_all_collections(&self) -> Result<bool, MutationError> {
138 let mut client = self.pool.get().await.map_err(|err| MutationError::General(Box::new(err)))?;
139
140 let query = r#"
142 SELECT table_name
143 FROM information_schema.tables
144 WHERE table_schema = 'public'
145 "#;
146
147 let rows = client.query(query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
148 if rows.is_empty() {
149 return Ok(false);
150 }
151
152 let transaction = client.transaction().await.map_err(|err| MutationError::General(Box::new(err)))?;
154
155 for row in rows {
157 let table_name: String = row.get("table_name");
158 let drop_query = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
159 transaction.execute(&drop_query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
160 }
161
162 transaction.commit().await.map_err(|err| MutationError::General(Box::new(err)))?;
164
165 Ok(true)
166 }
167}
168
169#[derive(Clone, Debug)]
170pub struct PostgresColumn {
171 pub name: String,
172 pub is_nullable: bool,
173 pub data_type: String,
174}
175
176pub struct PostgresBucket {
177 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
178 collection_id: CollectionId,
179 schema: String,
180 columns: Arc<RwLock<Vec<PostgresColumn>>>,
181}
182
183impl PostgresBucket {
184 fn state_table(&self) -> String { self.collection_id.as_str().to_string() }
185
186 pub fn event_table(&self) -> String { format!("{}_event", self.collection_id.as_str()) }
187
188 pub async fn rebuild_columns_cache(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
190 debug!("PostgresBucket({}).rebuild_columns_cache", self.collection_id);
191 let column_query =
192 r#"SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_catalog = $1 AND table_name = $2;"#
193 .to_string();
194 let mut new_columns = Vec::new();
195 debug!("Querying existing columns: {:?}, [{:?}, {:?}]", column_query, &self.schema, &self.collection_id.as_str());
196 let rows = client
197 .query(&column_query, &[&self.schema, &self.collection_id.as_str()])
198 .await
199 .map_err(|err| StateError::DDLError(Box::new(err)))?;
200 for row in rows {
201 let is_nullable: String = row.get("is_nullable");
202 new_columns.push(PostgresColumn {
203 name: row.get("column_name"),
204 is_nullable: is_nullable.eq("YES"),
205 data_type: row.get("data_type"),
206 })
207 }
208
209 let mut columns = self.columns.write().unwrap();
210 *columns = new_columns;
211 drop(columns);
212
213 Ok(())
214 }
215
216 pub fn existing_columns(&self) -> Vec<String> {
217 let columns = self.columns.read().unwrap();
218 columns.iter().map(|column| column.name.clone()).collect()
219 }
220
221 pub fn column(&self, column_name: &String) -> Option<PostgresColumn> {
222 let columns = self.columns.read().unwrap();
223 columns.iter().find(|column| column.name == *column_name).cloned()
224 }
225
226 pub fn has_column(&self, column_name: &String) -> bool { self.column(column_name).is_some() }
227
228 pub async fn create_event_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
229 let create_query = format!(
230 r#"CREATE TABLE IF NOT EXISTS "{}"(
231 "id" character(43) PRIMARY KEY,
232 "entity_id" character(22),
233 "operations" bytea,
234 "parent" character(43)[],
235 "attestations" bytea
236 )"#,
237 self.event_table()
238 );
239
240 debug!("{create_query}");
241 client.execute(&create_query, &[]).await.map_err(|e| StateError::DDLError(Box::new(e)))?;
242 Ok(())
243 }
244
245 pub async fn create_state_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
246 let create_query = format!(
247 r#"CREATE TABLE IF NOT EXISTS "{}"(
248 "id" character(22) PRIMARY KEY,
249 "state_buffer" BYTEA,
250 "head" character(43)[],
251 "attestations" BYTEA[]
252 )"#,
253 self.state_table()
254 );
255
256 debug!("{create_query}");
257 match client.execute(&create_query, &[]).await {
258 Ok(_) => Ok(()),
259 Err(err) => {
260 if let Some(db_err) = err.as_db_error() {
262 error!("PostgresBucket({}).create_state_table error: {} (code: {:?})", self.collection_id, db_err, db_err.code());
263 } else {
264 error!("PostgresBucket({}).create_state_table error: {:?}", self.collection_id, err);
265 }
266 Err(StateError::DDLError(Box::new(err)))
267 }
268 }
269 }
270
271 pub async fn add_missing_columns(
272 &self,
273 client: &mut tokio_postgres::Client,
274 missing: Vec<(String, &'static str)>, ) -> Result<(), StateError> {
276 if missing.is_empty() {
277 return Ok(());
278 }
279
280 let lock_key = acquire_ddl_lock(client, self.collection_id.as_str()).await?;
282
283 let result = async {
284 self.rebuild_columns_cache(client).await?;
286
287 for (column, datatype) in missing {
288 if Postgres::sane_name(&column) && !self.has_column(&column) {
289 let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, self.state_table(), column, datatype);
290 info!("PostgresBucket({}).add_missing_columns: {}", self.collection_id, alter_query);
291 match client.execute(&alter_query, &[]).await {
292 Ok(_) => {}
293 Err(err) => {
294 if let Some(db_err) = err.as_db_error() {
296 warn!(
297 "Error adding column {} to table {}: {} (code: {:?})",
298 column,
299 self.state_table(),
300 db_err,
301 db_err.code()
302 );
303 } else {
304 warn!("Error adding column {} to table {}: {:?}", column, self.state_table(), err);
305 }
306 self.rebuild_columns_cache(client).await?;
307 return Err(StateError::DDLError(Box::new(err)));
308 }
309 }
310 }
311 }
312
313 self.rebuild_columns_cache(client).await?;
314 Ok(())
315 }
316 .await;
317
318 release_ddl_lock(client, lock_key).await?;
320
321 result
322 }
323}
324
325#[async_trait]
326impl StorageCollection for PostgresBucket {
327 async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
328 let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
329 let attestations: Vec<Vec<u8>> = state.attestations.iter().map(bincode::serialize).collect::<Result<Vec<_>, _>>()?;
330 let id = state.payload.entity_id;
331
332 if state.payload.state.head.is_empty() {
334 warn!("Warning: Empty head detected for entity {}", id);
335 }
336
337 let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
338
339 let mut columns: Vec<String> = vec!["id".to_owned(), "state_buffer".to_owned(), "head".to_owned(), "attestations".to_owned()];
340 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
341 params.push(&id);
342 params.push(&state_buffers);
343 params.push(&state.payload.state.head);
344 params.push(&attestations);
345
346 let mut materialized: Vec<(String, Option<PGValue>)> = Vec::new();
347 let mut seen_properties = std::collections::HashSet::new();
348
349 for (name, state_buffer) in state.payload.state.state_buffers.iter() {
351 let backend = backend_from_string(name, Some(state_buffer))?;
352 for (column, value) in backend.property_values() {
353 if !seen_properties.insert(column.clone()) {
354 continue;
359 }
360
361 let pg_value: Option<PGValue> = value.map(|value| value.into());
362 if !self.has_column(&column) {
363 if let Some(ref pg_value) = pg_value {
365 self.add_missing_columns(&mut client, vec![(column.clone(), pg_value.postgres_type())]).await?;
366 } else {
367 continue;
371 }
372 }
373
374 materialized.push((column.clone(), pg_value));
375 }
376 }
377
378 for (name, parameter) in &materialized {
379 columns.push(name.clone());
380
381 match ¶meter {
382 Some(value) => match value {
383 PGValue::CharacterVarying(string) => params.push(string),
384 PGValue::SmallInt(number) => params.push(number),
385 PGValue::Integer(number) => params.push(number),
386 PGValue::BigInt(number) => params.push(number),
387 PGValue::DoublePrecision(float) => params.push(float),
388 PGValue::Bytea(bytes) => params.push(bytes),
389 PGValue::Boolean(bool) => params.push(bool),
390 },
391 None => params.push(&UntypedNull),
392 }
393 }
394
395 let columns_str = columns.iter().map(|name| format!("\"{}\"", name)).collect::<Vec<String>>().join(", ");
396 let values_str = params.iter().enumerate().map(|(index, _)| format!("${}", index + 1)).collect::<Vec<String>>().join(", ");
397 let columns_update_str = columns
398 .iter()
399 .enumerate()
400 .skip(1) .map(|(index, name)| format!("\"{}\" = ${}", name, index + 1))
402 .collect::<Vec<String>>()
403 .join(", ");
404
405 let query = format!(
407 r#"WITH old_state AS (
408 SELECT "head" FROM "{0}" WHERE "id" = $1
409 )
410 INSERT INTO "{0}"({1}) VALUES({2})
411 ON CONFLICT("id") DO UPDATE SET {3}
412 RETURNING (SELECT "head" FROM old_state) as old_head"#,
413 self.state_table(),
414 columns_str,
415 values_str,
416 columns_update_str
417 );
418
419 debug!("PostgresBucket({}).set_state: {}", self.collection_id, query);
420 let mut created_table = false;
421 let row = loop {
422 match client.query_one(&query, params.as_slice()).await {
423 Ok(row) => break row,
424 Err(err) => {
425 let kind = error_kind(&err);
426 if let ErrorKind::UndefinedTable { table } = kind {
427 if table == self.state_table() && !created_table {
428 self.create_state_table(&mut client).await?;
429 created_table = true;
430 continue; }
432 }
433 return Err(StateError::DDLError(Box::new(err)).into());
434 }
435 }
436 };
437
438 let old_head: Option<Clock> = row.get("old_head");
440 let changed = match old_head {
441 None => true, Some(old_head) => old_head != state.payload.state.head,
443 };
444
445 debug!("PostgresBucket({}).set_state: Changed: {}", self.collection_id, changed);
446 Ok(changed)
447 }
448
449 async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
450 let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = $1"#, self.state_table());
452
453 let mut client = match self.pool.get().await {
454 Ok(client) => client,
455 Err(err) => {
456 return Err(RetrievalError::StorageError(err.into()));
457 }
458 };
459
460 debug!("PostgresBucket({}).get_state: {}", self.collection_id, query);
461 let rows = match client.query(&query, &[&id]).await {
462 Ok(rows) => rows,
463 Err(err) => {
464 let kind = error_kind(&err);
465 if let ErrorKind::UndefinedTable { table } = kind {
466 if table == self.state_table() {
467 self.create_state_table(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
468 return Err(RetrievalError::EntityNotFound(id));
469 }
470 }
471 return Err(RetrievalError::StorageError(err.into()));
472 }
473 };
474
475 let row = match rows.into_iter().next() {
476 Some(row) => row,
477 None => return Err(RetrievalError::EntityNotFound(id)),
478 };
479
480 debug!("PostgresBucket({}).get_state: Row: {:?}", self.collection_id, row);
481 let row_id: EntityId = row.try_get("id").map_err(RetrievalError::storage)?;
482 assert_eq!(row_id, id);
483
484 let serialized_buffers: Vec<u8> = row.try_get("state_buffer").map_err(RetrievalError::storage)?;
485 let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&serialized_buffers).map_err(RetrievalError::storage)?;
486 let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
487 let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
488 let attestations = attestation_bytes
489 .into_iter()
490 .map(|bytes| bincode::deserialize(&bytes))
491 .collect::<Result<Vec<Attestation>, _>>()
492 .map_err(RetrievalError::storage)?;
493
494 Ok(Attested {
495 payload: EntityState {
496 entity_id: id,
497 collection: self.collection_id.clone(),
498 state: State { state_buffers: StateBuffers(state_buffers), head },
499 },
500 attestations: AttestationSet(attestations),
501 })
502 }
503
504 async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
505 debug!("fetch_states: {:?}", selection);
506 let mut client = self.pool.get().await.map_err(|err| RetrievalError::StorageError(Box::new(err)))?;
507
508 let referenced = selection.referenced_columns();
513 let cached = self.existing_columns();
514 let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
515
516 if !unknown_to_cache.is_empty() {
518 debug!("PostgresBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
519 self.rebuild_columns_cache(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
520 }
521
522 let existing = self.existing_columns();
524 let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
525
526 let effective_selection = if missing.is_empty() {
527 selection.clone()
528 } else {
529 debug!("PostgresBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
530 selection.assume_null(&missing)
531 };
532
533 let mut results = Vec::new();
534 let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
535 builder.table_name(self.state_table());
536 builder.selection(&effective_selection)?;
537
538 let (sql, args) = builder.build()?;
539 debug!("PostgresBucket({}).fetch_states: SQL: {} with args: {:?}", self.collection_id, sql, args);
540
541 let stream = match client.query_raw(&sql, args).await {
542 Ok(stream) => stream,
543 Err(err) => {
544 let kind = error_kind(&err);
545 if let ErrorKind::UndefinedTable { table } = kind {
546 if table == self.state_table() {
547 return Ok(Vec::new());
549 }
550 }
551 return Err(RetrievalError::StorageError(err.into()));
552 }
553 };
554 pin_mut!(stream);
555
556 while let Some(row) = stream.try_next().await.map_err(RetrievalError::storage)? {
557 let id: EntityId = row.try_get(0).map_err(RetrievalError::storage)?;
558 let state_buffer: Vec<u8> = row.try_get(1).map_err(RetrievalError::storage)?;
559 let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(RetrievalError::storage)?;
560 let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
561 let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
562 let attestations = attestation_bytes
563 .into_iter()
564 .map(|bytes| bincode::deserialize(&bytes))
565 .collect::<Result<Vec<Attestation>, _>>()
566 .map_err(RetrievalError::storage)?;
567
568 results.push(Attested {
569 payload: EntityState {
570 entity_id: id,
571 collection: self.collection_id.clone(),
572 state: State { state_buffers: StateBuffers(state_buffers), head },
573 },
574 attestations: AttestationSet(attestations),
575 });
576 }
577
578 Ok(results)
579 }
580
581 async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
582 let operations = bincode::serialize(&entity_event.payload.operations)?;
583 let attestations = bincode::serialize(&entity_event.attestations)?;
584
585 let query = format!(
586 r#"INSERT INTO "{0}"("id", "entity_id", "operations", "parent", "attestations") VALUES($1, $2, $3, $4, $5)
587 ON CONFLICT ("id") DO NOTHING"#,
588 self.event_table(),
589 );
590
591 let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
592 debug!("PostgresBucket({}).add_event: {}", self.collection_id, query);
593 let mut created_table = false;
594 let affected = loop {
595 match client
596 .execute(
597 &query,
598 &[
599 &entity_event.payload.id(),
600 &entity_event.payload.entity_id,
601 &operations,
602 &entity_event.payload.parent,
603 &attestations,
604 ],
605 )
606 .await
607 {
608 Ok(affected) => break affected,
609 Err(err) => {
610 let kind = error_kind(&err);
611 if let ErrorKind::UndefinedTable { table } = kind {
612 if table == self.event_table() && !created_table {
613 self.create_event_table(&mut client).await?;
614 created_table = true;
615 continue; }
617 }
618 error!("PostgresBucket({}).add_event: Error: {:?}", self.collection_id, err);
619 return Err(StateError::DMLError(Box::new(err)).into());
620 }
621 }
622 };
623
624 Ok(affected > 0)
625 }
626
627 async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
628 if event_ids.is_empty() {
629 return Ok(Vec::new());
630 }
631
632 let query = format!(
633 r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{0}" WHERE "id" = ANY($1)"#,
634 self.event_table(),
635 );
636
637 let client = self.pool.get().await.map_err(RetrievalError::storage)?;
638 let rows = match client.query(&query, &[&event_ids]).await {
639 Ok(rows) => rows,
640 Err(err) => {
641 let kind = error_kind(&err);
642 match kind {
643 ErrorKind::UndefinedTable { table } if table == self.event_table() => return Ok(Vec::new()),
644 _ => return Err(RetrievalError::storage(err)),
645 }
646 }
647 };
648
649 let mut events = Vec::new();
650 for row in rows {
651 let entity_id: EntityId = row.try_get("entity_id").map_err(RetrievalError::storage)?;
652 let operations: OperationSet = row.try_get("operations").map_err(RetrievalError::storage)?;
653 let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
654 let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
655 let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
656
657 let event = Attested {
658 payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
659 attestations: AttestationSet(attestations),
660 };
661 events.push(event);
662 }
663 Ok(events)
664 }
665
666 async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, ankurah_core::error::RetrievalError> {
667 let query =
668 format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{0}" WHERE "entity_id" = $1"#, self.event_table(),);
669
670 let client = self.pool.get().await.map_err(RetrievalError::storage)?;
671 debug!("PostgresBucket({}).get_events: {}", self.collection_id, query);
672 let rows = match client.query(&query, &[&entity_id]).await {
673 Ok(rows) => rows,
674 Err(err) => {
675 let kind = error_kind(&err);
676 if let ErrorKind::UndefinedTable { table } = kind {
677 if table == self.event_table() {
678 return Ok(Vec::new());
679 }
680 }
681
682 return Err(RetrievalError::storage(err));
683 }
684 };
685
686 let mut events = Vec::new();
687 for row in rows {
688 let operations_binary: Vec<u8> = row.try_get("operations").map_err(RetrievalError::storage)?;
690 let operations = bincode::deserialize(&operations_binary).map_err(RetrievalError::storage)?;
691 let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
692 let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
693 let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
694
695 events.push(Attested {
696 payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
697 attestations: AttestationSet(attestations),
698 });
699 }
700
701 Ok(events)
702 }
703}
704
705#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
709pub enum ErrorKind {
710 RowCount,
711 UndefinedTable { table: String },
712 UndefinedColumn { table: Option<String>, column: String },
713 Unknown,
714 PostgresError(String),
715}
716
717pub fn error_kind(err: &tokio_postgres::Error) -> ErrorKind {
718 let string = err.as_db_error().map(|e| e.message()).unwrap_or_default().trim().to_owned();
719 let _db_error = err.as_db_error();
720 let sql_code = err.code().cloned();
721
722 let err_string = err.to_string();
724 if err_string.contains("query returned an unexpected number of rows") || string == "query returned an unexpected number of rows" {
725 return ErrorKind::RowCount;
726 }
727
728 debug!("postgres error: {:?}", err);
735
736 let quote_indices = |s: &str| {
737 let mut quotes = Vec::new();
738 for (index, char) in s.char_indices() {
739 if char == '"' {
740 quotes.push(index)
741 }
742 }
743 quotes
744 };
745
746 match sql_code {
747 Some(SqlState::UNDEFINED_TABLE) => {
748 let quotes = quote_indices(&string);
750 if quotes.len() >= 2 {
751 let table = &string[quotes[0] + 1..quotes[1]];
752 ErrorKind::UndefinedTable { table: table.to_owned() }
753 } else {
754 ErrorKind::PostgresError(string.clone())
755 }
756 }
757 Some(SqlState::UNDEFINED_COLUMN) => {
758 let quotes = quote_indices(&string);
762 if quotes.len() >= 2 {
763 let column = string[quotes[0] + 1..quotes[1]].to_owned();
764
765 let table = if quotes.len() >= 4 {
766 Some(string[quotes[2] + 1..quotes[3]].to_owned())
768 } else {
769 None
771 };
772
773 ErrorKind::UndefinedColumn { table, column }
774 } else {
775 ErrorKind::PostgresError(string.clone())
776 }
777 }
778 _ => ErrorKind::Unknown,
779 }
780}
781
782#[allow(unused)]
783pub struct MissingMaterialized {
784 pub name: String,
785}
786
787use bytes::BytesMut;
788use tokio_postgres::types::{to_sql_checked, IsNull, Type};
789
790use crate::sql_builder::SqlBuilder;
791
792#[derive(Debug)]
793struct UntypedNull;
794
795impl ToSql for UntypedNull {
796 fn to_sql(&self, _ty: &Type, _out: &mut BytesMut) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> { Ok(IsNull::Yes) }
797
798 fn accepts(_ty: &Type) -> bool {
799 true }
801
802 to_sql_checked!();
803}