ankurah_storage_postgres/
lib.rs

1use std::{
2    collections::{hash_map::DefaultHasher, BTreeMap},
3    hash::{Hash, Hasher},
4    sync::{Arc, RwLock},
5    time::Duration,
6};
7
8use ankurah_core::{
9    error::{MutationError, RetrievalError, StateError},
10    property::backend::backend_from_string,
11    storage::{StorageCollection, StorageEngine},
12};
13use ankurah_proto::{Attestation, AttestationSet, Attested, EntityState, EventId, OperationSet, State, StateBuffers};
14
15use futures_util::{pin_mut, TryStreamExt};
16
17pub mod sql_builder;
18pub mod value;
19
20use value::PGValue;
21
22use ankurah_proto::{Clock, CollectionId, EntityId, Event};
23use async_trait::async_trait;
24use bb8_postgres::{tokio_postgres::NoTls, PostgresConnectionManager};
25use tokio_postgres::{error::SqlState, types::ToSql};
26use tracing::{debug, error, info, warn};
27
28/// Default connection pool size for `Postgres::open()`.
29/// Production applications should configure their own pool via `Postgres::new()`.
30pub const DEFAULT_POOL_SIZE: u32 = 15;
31
32/// Default connection timeout in seconds
33pub const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
34
35pub struct Postgres {
36    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
37}
38
39impl Postgres {
40    pub fn new(pool: bb8::Pool<PostgresConnectionManager<NoTls>>) -> anyhow::Result<Self> { Ok(Self { pool }) }
41
42    pub async fn open(uri: &str) -> anyhow::Result<Self> {
43        let manager = PostgresConnectionManager::new_from_stringlike(uri, NoTls)?;
44        let pool = bb8::Pool::builder()
45            .max_size(DEFAULT_POOL_SIZE)
46            .connection_timeout(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS))
47            .build(manager)
48            .await?;
49        Self::new(pool)
50    }
51
52    // TODO: newtype this to `BucketName(&str)` with a constructor that
53    // only accepts a subset of characters.
54    pub fn sane_name(collection: &str) -> bool {
55        for char in collection.chars() {
56            match char {
57                char if char.is_alphanumeric() => {}
58                char if char.is_numeric() => {}
59                '_' | '.' | ':' => {}
60                _ => return false,
61            }
62        }
63
64        true
65    }
66}
67
68/// Compute advisory lock key from a string identifier
69fn advisory_lock_key(identifier: &str) -> i64 {
70    let mut hasher = DefaultHasher::new();
71    identifier.hash(&mut hasher);
72    hasher.finish() as i64
73}
74
75/// Acquire a PostgreSQL advisory lock for DDL operations on a collection
76async fn acquire_ddl_lock(client: &tokio_postgres::Client, collection_id: &str) -> Result<i64, StateError> {
77    let lock_key = advisory_lock_key(&format!("ankurah_ddl:{}", collection_id));
78    debug!("Acquiring advisory lock {} for collection {}", lock_key, collection_id);
79    client.execute("SELECT pg_advisory_lock($1)", &[&lock_key]).await.map_err(|err| {
80        error!("Failed to acquire advisory lock for {}: {:?}", collection_id, err);
81        StateError::DDLError(Box::new(err))
82    })?;
83    Ok(lock_key)
84}
85
86/// Release a PostgreSQL advisory lock
87async fn release_ddl_lock(client: &tokio_postgres::Client, lock_key: i64) -> Result<(), StateError> {
88    debug!("Releasing advisory lock {}", lock_key);
89    client.execute("SELECT pg_advisory_unlock($1)", &[&lock_key]).await.map_err(|err| {
90        error!("Failed to release advisory lock {}: {:?}", lock_key, err);
91        StateError::DDLError(Box::new(err))
92    })?;
93    Ok(())
94}
95
96#[async_trait]
97impl StorageEngine for Postgres {
98    type Value = PGValue;
99
100    async fn collection(&self, collection_id: &CollectionId) -> Result<std::sync::Arc<dyn StorageCollection>, RetrievalError> {
101        if !Postgres::sane_name(collection_id.as_str()) {
102            return Err(RetrievalError::InvalidBucketName);
103        }
104
105        let mut client = self.pool.get().await.map_err(RetrievalError::storage)?;
106
107        // get the current schema from the database
108        let schema = client.query_one("SELECT current_database()", &[]).await.map_err(RetrievalError::storage)?;
109        let schema = schema.get("current_database");
110
111        let bucket = PostgresBucket {
112            pool: self.pool.clone(),
113            schema,
114            collection_id: collection_id.clone(),
115            columns: Arc::new(RwLock::new(Vec::new())),
116        };
117
118        // Acquire advisory lock to serialize DDL operations for this collection
119        let lock_key = acquire_ddl_lock(&client, collection_id.as_str()).await?;
120
121        // Create tables if they don't exist (protected by advisory lock)
122        let result = async {
123            bucket.create_state_table(&mut client).await?;
124            bucket.create_event_table(&mut client).await?;
125            bucket.rebuild_columns_cache(&mut client).await?;
126            Ok::<_, StateError>(())
127        }
128        .await;
129
130        // Always release the lock, even if DDL failed
131        release_ddl_lock(&client, lock_key).await?;
132
133        result?;
134        Ok(Arc::new(bucket))
135    }
136
137    async fn delete_all_collections(&self) -> Result<bool, MutationError> {
138        let mut client = self.pool.get().await.map_err(|err| MutationError::General(Box::new(err)))?;
139
140        // Get all tables in the public schema
141        let query = r#"
142            SELECT table_name 
143            FROM information_schema.tables 
144            WHERE table_schema = 'public'
145        "#;
146
147        let rows = client.query(query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
148        if rows.is_empty() {
149            return Ok(false);
150        }
151
152        // Start a transaction to drop all tables atomically
153        let transaction = client.transaction().await.map_err(|err| MutationError::General(Box::new(err)))?;
154
155        // Drop each table
156        for row in rows {
157            let table_name: String = row.get("table_name");
158            let drop_query = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
159            transaction.execute(&drop_query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
160        }
161
162        // Commit the transaction
163        transaction.commit().await.map_err(|err| MutationError::General(Box::new(err)))?;
164
165        Ok(true)
166    }
167}
168
169#[derive(Clone, Debug)]
170pub struct PostgresColumn {
171    pub name: String,
172    pub is_nullable: bool,
173    pub data_type: String,
174}
175
176pub struct PostgresBucket {
177    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
178    collection_id: CollectionId,
179    schema: String,
180    columns: Arc<RwLock<Vec<PostgresColumn>>>,
181}
182
183impl PostgresBucket {
184    fn state_table(&self) -> String { self.collection_id.as_str().to_string() }
185
186    pub fn event_table(&self) -> String { format!("{}_event", self.collection_id.as_str()) }
187
188    /// Rebuild the cache of columns in the table.
189    pub async fn rebuild_columns_cache(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
190        debug!("PostgresBucket({}).rebuild_columns_cache", self.collection_id);
191        let column_query =
192            r#"SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_catalog = $1 AND table_name = $2;"#
193                .to_string();
194        let mut new_columns = Vec::new();
195        debug!("Querying existing columns: {:?}, [{:?}, {:?}]", column_query, &self.schema, &self.collection_id.as_str());
196        let rows = client
197            .query(&column_query, &[&self.schema, &self.collection_id.as_str()])
198            .await
199            .map_err(|err| StateError::DDLError(Box::new(err)))?;
200        for row in rows {
201            let is_nullable: String = row.get("is_nullable");
202            new_columns.push(PostgresColumn {
203                name: row.get("column_name"),
204                is_nullable: is_nullable.eq("YES"),
205                data_type: row.get("data_type"),
206            })
207        }
208
209        let mut columns = self.columns.write().unwrap();
210        *columns = new_columns;
211        drop(columns);
212
213        Ok(())
214    }
215
216    pub fn existing_columns(&self) -> Vec<String> {
217        let columns = self.columns.read().unwrap();
218        columns.iter().map(|column| column.name.clone()).collect()
219    }
220
221    pub fn column(&self, column_name: &String) -> Option<PostgresColumn> {
222        let columns = self.columns.read().unwrap();
223        columns.iter().find(|column| column.name == *column_name).cloned()
224    }
225
226    pub fn has_column(&self, column_name: &String) -> bool { self.column(column_name).is_some() }
227
228    pub async fn create_event_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
229        let create_query = format!(
230            r#"CREATE TABLE IF NOT EXISTS "{}"(
231                "id" character(43) PRIMARY KEY,
232                "entity_id" character(22),
233                "operations" bytea,
234                "parent" character(43)[],
235                "attestations" bytea
236            )"#,
237            self.event_table()
238        );
239
240        debug!("{create_query}");
241        client.execute(&create_query, &[]).await.map_err(|e| StateError::DDLError(Box::new(e)))?;
242        Ok(())
243    }
244
245    pub async fn create_state_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
246        let create_query = format!(
247            r#"CREATE TABLE IF NOT EXISTS "{}"(
248                "id" character(22) PRIMARY KEY,
249                "state_buffer" BYTEA,
250                "head" character(43)[],
251                "attestations" BYTEA[]
252            )"#,
253            self.state_table()
254        );
255
256        debug!("{create_query}");
257        match client.execute(&create_query, &[]).await {
258            Ok(_) => Ok(()),
259            Err(err) => {
260                // Log full error details for debugging
261                if let Some(db_err) = err.as_db_error() {
262                    error!("PostgresBucket({}).create_state_table error: {} (code: {:?})", self.collection_id, db_err, db_err.code());
263                } else {
264                    error!("PostgresBucket({}).create_state_table error: {:?}", self.collection_id, err);
265                }
266                Err(StateError::DDLError(Box::new(err)))
267            }
268        }
269    }
270
271    pub async fn add_missing_columns(
272        &self,
273        client: &mut tokio_postgres::Client,
274        missing: Vec<(String, &'static str)>, // column name, datatype
275    ) -> Result<(), StateError> {
276        if missing.is_empty() {
277            return Ok(());
278        }
279
280        // Acquire advisory lock to serialize DDL operations for this collection
281        let lock_key = acquire_ddl_lock(client, self.collection_id.as_str()).await?;
282
283        let result = async {
284            // Re-check columns after acquiring lock (another session may have added them)
285            self.rebuild_columns_cache(client).await?;
286
287            for (column, datatype) in missing {
288                if Postgres::sane_name(&column) && !self.has_column(&column) {
289                    let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, self.state_table(), column, datatype);
290                    info!("PostgresBucket({}).add_missing_columns: {}", self.collection_id, alter_query);
291                    match client.execute(&alter_query, &[]).await {
292                        Ok(_) => {}
293                        Err(err) => {
294                            // Log full error details for debugging
295                            if let Some(db_err) = err.as_db_error() {
296                                warn!(
297                                    "Error adding column {} to table {}: {} (code: {:?})",
298                                    column,
299                                    self.state_table(),
300                                    db_err,
301                                    db_err.code()
302                                );
303                            } else {
304                                warn!("Error adding column {} to table {}: {:?}", column, self.state_table(), err);
305                            }
306                            self.rebuild_columns_cache(client).await?;
307                            return Err(StateError::DDLError(Box::new(err)));
308                        }
309                    }
310                }
311            }
312
313            self.rebuild_columns_cache(client).await?;
314            Ok(())
315        }
316        .await;
317
318        // Always release the lock
319        release_ddl_lock(client, lock_key).await?;
320
321        result
322    }
323}
324
325#[async_trait]
326impl StorageCollection for PostgresBucket {
327    async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
328        let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
329        let attestations: Vec<Vec<u8>> = state.attestations.iter().map(bincode::serialize).collect::<Result<Vec<_>, _>>()?;
330        let id = state.payload.entity_id;
331
332        // Ensure head is not empty for new records
333        if state.payload.state.head.is_empty() {
334            warn!("Warning: Empty head detected for entity {}", id);
335        }
336
337        let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
338
339        let mut columns: Vec<String> = vec!["id".to_owned(), "state_buffer".to_owned(), "head".to_owned(), "attestations".to_owned()];
340        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
341        params.push(&id);
342        params.push(&state_buffers);
343        params.push(&state.payload.state.head);
344        params.push(&attestations);
345
346        let mut materialized: Vec<(String, Option<PGValue>)> = Vec::new();
347        let mut seen_properties = std::collections::HashSet::new();
348
349        // Process property values directly from state buffers
350        for (name, state_buffer) in state.payload.state.state_buffers.iter() {
351            let backend = backend_from_string(name, Some(state_buffer))?;
352            for (column, value) in backend.property_values() {
353                if !seen_properties.insert(column.clone()) {
354                    // Skip if property already seen in another backend
355                    // TODO: this should cause all (or subsequent?) fields with the same name
356                    // to be suffixed with the property id when we have property ids
357                    // requires some thought (and field metadata) on how to do this right
358                    continue;
359                }
360
361                let pg_value: Option<PGValue> = value.map(|value| value.into());
362                if !self.has_column(&column) {
363                    // We don't have the column yet and we know the type.
364                    if let Some(ref pg_value) = pg_value {
365                        self.add_missing_columns(&mut client, vec![(column.clone(), pg_value.postgres_type())]).await?;
366                    } else {
367                        // The column doesn't exist yet and we don't have a value.
368                        // This means the entire column is already null/none so we
369                        // don't need to set anything.
370                        continue;
371                    }
372                }
373
374                materialized.push((column.clone(), pg_value));
375            }
376        }
377
378        for (name, parameter) in &materialized {
379            columns.push(name.clone());
380
381            match &parameter {
382                Some(value) => match value {
383                    PGValue::CharacterVarying(string) => params.push(string),
384                    PGValue::SmallInt(number) => params.push(number),
385                    PGValue::Integer(number) => params.push(number),
386                    PGValue::BigInt(number) => params.push(number),
387                    PGValue::DoublePrecision(float) => params.push(float),
388                    PGValue::Bytea(bytes) => params.push(bytes),
389                    PGValue::Boolean(bool) => params.push(bool),
390                },
391                None => params.push(&UntypedNull),
392            }
393        }
394
395        let columns_str = columns.iter().map(|name| format!("\"{}\"", name)).collect::<Vec<String>>().join(", ");
396        let values_str = params.iter().enumerate().map(|(index, _)| format!("${}", index + 1)).collect::<Vec<String>>().join(", ");
397        let columns_update_str = columns
398            .iter()
399            .enumerate()
400            .skip(1) // Skip "id"
401            .map(|(index, name)| format!("\"{}\" = ${}", name, index + 1))
402            .collect::<Vec<String>>()
403            .join(", ");
404
405        // be careful with sql injection via bucket name
406        let query = format!(
407            r#"WITH old_state AS (
408                SELECT "head" FROM "{0}" WHERE "id" = $1
409            )
410            INSERT INTO "{0}"({1}) VALUES({2})
411            ON CONFLICT("id") DO UPDATE SET {3}
412            RETURNING (SELECT "head" FROM old_state) as old_head"#,
413            self.state_table(),
414            columns_str,
415            values_str,
416            columns_update_str
417        );
418
419        debug!("PostgresBucket({}).set_state: {}", self.collection_id, query);
420        let mut created_table = false;
421        let row = loop {
422            match client.query_one(&query, params.as_slice()).await {
423                Ok(row) => break row,
424                Err(err) => {
425                    let kind = error_kind(&err);
426                    if let ErrorKind::UndefinedTable { table } = kind {
427                        if table == self.state_table() && !created_table {
428                            self.create_state_table(&mut client).await?;
429                            created_table = true;
430                            continue; // retry exactly once
431                        }
432                    }
433                    return Err(StateError::DDLError(Box::new(err)).into());
434                }
435            }
436        };
437
438        // If this is a new entity (no old_head), or if the heads are different, return true
439        let old_head: Option<Clock> = row.get("old_head");
440        let changed = match old_head {
441            None => true, // New entity
442            Some(old_head) => old_head != state.payload.state.head,
443        };
444
445        debug!("PostgresBucket({}).set_state: Changed: {}", self.collection_id, changed);
446        Ok(changed)
447    }
448
449    async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
450        // be careful with sql injection via bucket name
451        let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = $1"#, self.state_table());
452
453        let mut client = match self.pool.get().await {
454            Ok(client) => client,
455            Err(err) => {
456                return Err(RetrievalError::StorageError(err.into()));
457            }
458        };
459
460        debug!("PostgresBucket({}).get_state: {}", self.collection_id, query);
461        let rows = match client.query(&query, &[&id]).await {
462            Ok(rows) => rows,
463            Err(err) => {
464                let kind = error_kind(&err);
465                if let ErrorKind::UndefinedTable { table } = kind {
466                    if table == self.state_table() {
467                        self.create_state_table(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
468                        return Err(RetrievalError::EntityNotFound(id));
469                    }
470                }
471                return Err(RetrievalError::StorageError(err.into()));
472            }
473        };
474
475        let row = match rows.into_iter().next() {
476            Some(row) => row,
477            None => return Err(RetrievalError::EntityNotFound(id)),
478        };
479
480        debug!("PostgresBucket({}).get_state: Row: {:?}", self.collection_id, row);
481        let row_id: EntityId = row.try_get("id").map_err(RetrievalError::storage)?;
482        assert_eq!(row_id, id);
483
484        let serialized_buffers: Vec<u8> = row.try_get("state_buffer").map_err(RetrievalError::storage)?;
485        let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&serialized_buffers).map_err(RetrievalError::storage)?;
486        let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
487        let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
488        let attestations = attestation_bytes
489            .into_iter()
490            .map(|bytes| bincode::deserialize(&bytes))
491            .collect::<Result<Vec<Attestation>, _>>()
492            .map_err(RetrievalError::storage)?;
493
494        Ok(Attested {
495            payload: EntityState {
496                entity_id: id,
497                collection: self.collection_id.clone(),
498                state: State { state_buffers: StateBuffers(state_buffers), head },
499            },
500            attestations: AttestationSet(attestations),
501        })
502    }
503
504    async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
505        debug!("fetch_states: {:?}", selection);
506        let mut client = self.pool.get().await.map_err(|err| RetrievalError::StorageError(Box::new(err)))?;
507
508        // Pre-filter selection based on cached schema to avoid undefined column errors.
509        // If we see columns not in our cache, refresh it first (they might have been added).
510        // TODO: Once property metadata is in the system catalog, we can create missing columns
511        // on-demand here instead of refreshing the cache each time we see unknown columns.
512        let referenced = selection.referenced_columns();
513        let cached = self.existing_columns();
514        let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
515
516        // Refresh cache if we see columns we haven't seen before
517        if !unknown_to_cache.is_empty() {
518            debug!("PostgresBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
519            self.rebuild_columns_cache(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
520        }
521
522        // Now check with (possibly refreshed) cache - columns still missing truly don't exist
523        let existing = self.existing_columns();
524        let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
525
526        let effective_selection = if missing.is_empty() {
527            selection.clone()
528        } else {
529            debug!("PostgresBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
530            selection.assume_null(&missing)
531        };
532
533        let mut results = Vec::new();
534        let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
535        builder.table_name(self.state_table());
536        builder.selection(&effective_selection)?;
537
538        let (sql, args) = builder.build()?;
539        debug!("PostgresBucket({}).fetch_states: SQL: {} with args: {:?}", self.collection_id, sql, args);
540
541        let stream = match client.query_raw(&sql, args).await {
542            Ok(stream) => stream,
543            Err(err) => {
544                let kind = error_kind(&err);
545                if let ErrorKind::UndefinedTable { table } = kind {
546                    if table == self.state_table() {
547                        // Table doesn't exist yet, return empty results
548                        return Ok(Vec::new());
549                    }
550                }
551                return Err(RetrievalError::StorageError(err.into()));
552            }
553        };
554        pin_mut!(stream);
555
556        while let Some(row) = stream.try_next().await.map_err(RetrievalError::storage)? {
557            let id: EntityId = row.try_get(0).map_err(RetrievalError::storage)?;
558            let state_buffer: Vec<u8> = row.try_get(1).map_err(RetrievalError::storage)?;
559            let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(RetrievalError::storage)?;
560            let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
561            let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
562            let attestations = attestation_bytes
563                .into_iter()
564                .map(|bytes| bincode::deserialize(&bytes))
565                .collect::<Result<Vec<Attestation>, _>>()
566                .map_err(RetrievalError::storage)?;
567
568            results.push(Attested {
569                payload: EntityState {
570                    entity_id: id,
571                    collection: self.collection_id.clone(),
572                    state: State { state_buffers: StateBuffers(state_buffers), head },
573                },
574                attestations: AttestationSet(attestations),
575            });
576        }
577
578        Ok(results)
579    }
580
581    async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
582        let operations = bincode::serialize(&entity_event.payload.operations)?;
583        let attestations = bincode::serialize(&entity_event.attestations)?;
584
585        let query = format!(
586            r#"INSERT INTO "{0}"("id", "entity_id", "operations", "parent", "attestations") VALUES($1, $2, $3, $4, $5)
587               ON CONFLICT ("id") DO NOTHING"#,
588            self.event_table(),
589        );
590
591        let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
592        debug!("PostgresBucket({}).add_event: {}", self.collection_id, query);
593        let mut created_table = false;
594        let affected = loop {
595            match client
596                .execute(
597                    &query,
598                    &[
599                        &entity_event.payload.id(),
600                        &entity_event.payload.entity_id,
601                        &operations,
602                        &entity_event.payload.parent,
603                        &attestations,
604                    ],
605                )
606                .await
607            {
608                Ok(affected) => break affected,
609                Err(err) => {
610                    let kind = error_kind(&err);
611                    if let ErrorKind::UndefinedTable { table } = kind {
612                        if table == self.event_table() && !created_table {
613                            self.create_event_table(&mut client).await?;
614                            created_table = true;
615                            continue; // retry exactly once
616                        }
617                    }
618                    error!("PostgresBucket({}).add_event: Error: {:?}", self.collection_id, err);
619                    return Err(StateError::DMLError(Box::new(err)).into());
620                }
621            }
622        };
623
624        Ok(affected > 0)
625    }
626
627    async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
628        if event_ids.is_empty() {
629            return Ok(Vec::new());
630        }
631
632        let query = format!(
633            r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{0}" WHERE "id" = ANY($1)"#,
634            self.event_table(),
635        );
636
637        let client = self.pool.get().await.map_err(RetrievalError::storage)?;
638        let rows = match client.query(&query, &[&event_ids]).await {
639            Ok(rows) => rows,
640            Err(err) => {
641                let kind = error_kind(&err);
642                match kind {
643                    ErrorKind::UndefinedTable { table } if table == self.event_table() => return Ok(Vec::new()),
644                    _ => return Err(RetrievalError::storage(err)),
645                }
646            }
647        };
648
649        let mut events = Vec::new();
650        for row in rows {
651            let entity_id: EntityId = row.try_get("entity_id").map_err(RetrievalError::storage)?;
652            let operations: OperationSet = row.try_get("operations").map_err(RetrievalError::storage)?;
653            let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
654            let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
655            let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
656
657            let event = Attested {
658                payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
659                attestations: AttestationSet(attestations),
660            };
661            events.push(event);
662        }
663        Ok(events)
664    }
665
666    async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, ankurah_core::error::RetrievalError> {
667        let query =
668            format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{0}" WHERE "entity_id" = $1"#, self.event_table(),);
669
670        let client = self.pool.get().await.map_err(RetrievalError::storage)?;
671        debug!("PostgresBucket({}).get_events: {}", self.collection_id, query);
672        let rows = match client.query(&query, &[&entity_id]).await {
673            Ok(rows) => rows,
674            Err(err) => {
675                let kind = error_kind(&err);
676                if let ErrorKind::UndefinedTable { table } = kind {
677                    if table == self.event_table() {
678                        return Ok(Vec::new());
679                    }
680                }
681
682                return Err(RetrievalError::storage(err));
683            }
684        };
685
686        let mut events = Vec::new();
687        for row in rows {
688            // let event_id: EventId = row.try_get("id").map_err(|err| RetrievalError::storage(err))?;
689            let operations_binary: Vec<u8> = row.try_get("operations").map_err(RetrievalError::storage)?;
690            let operations = bincode::deserialize(&operations_binary).map_err(RetrievalError::storage)?;
691            let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
692            let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
693            let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
694
695            events.push(Attested {
696                payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
697                attestations: AttestationSet(attestations),
698            });
699        }
700
701        Ok(events)
702    }
703}
704
705// Some hacky shit because rust-postgres doesn't let us ask for the error kind
706// TODO: remove this when https://github.com/sfackler/rust-postgres/pull/1185
707//       gets merged
708#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
709pub enum ErrorKind {
710    RowCount,
711    UndefinedTable { table: String },
712    UndefinedColumn { table: Option<String>, column: String },
713    Unknown,
714    PostgresError(String),
715}
716
717pub fn error_kind(err: &tokio_postgres::Error) -> ErrorKind {
718    let string = err.as_db_error().map(|e| e.message()).unwrap_or_default().trim().to_owned();
719    let _db_error = err.as_db_error();
720    let sql_code = err.code().cloned();
721
722    // Check the error's Display string for RowCount errors (client-side, not db error)
723    let err_string = err.to_string();
724    if err_string.contains("query returned an unexpected number of rows") || string == "query returned an unexpected number of rows" {
725        return ErrorKind::RowCount;
726    }
727
728    // Useful for adding new errors
729    // error!("postgres error: {:?}", err);
730    // error!("db_err: {:?}", err.as_db_error());
731    // error!("sql_code: {:?}", err.code());
732    // error!("err: {:?}", err);
733    // error!("err: {:?}", err.to_string());
734    debug!("postgres error: {:?}", err);
735
736    let quote_indices = |s: &str| {
737        let mut quotes = Vec::new();
738        for (index, char) in s.char_indices() {
739            if char == '"' {
740                quotes.push(index)
741            }
742        }
743        quotes
744    };
745
746    match sql_code {
747        Some(SqlState::UNDEFINED_TABLE) => {
748            // relation "album" does not exist
749            let quotes = quote_indices(&string);
750            if quotes.len() >= 2 {
751                let table = &string[quotes[0] + 1..quotes[1]];
752                ErrorKind::UndefinedTable { table: table.to_owned() }
753            } else {
754                ErrorKind::PostgresError(string.clone())
755            }
756        }
757        Some(SqlState::UNDEFINED_COLUMN) => {
758            // Handle both formats:
759            // "column "name" of relation "album" does not exist"
760            // "column "status" does not exist"
761            let quotes = quote_indices(&string);
762            if quotes.len() >= 2 {
763                let column = string[quotes[0] + 1..quotes[1]].to_owned();
764
765                let table = if quotes.len() >= 4 {
766                    // Full format with table name
767                    Some(string[quotes[2] + 1..quotes[3]].to_owned())
768                } else {
769                    // Short format without table name
770                    None
771                };
772
773                ErrorKind::UndefinedColumn { table, column }
774            } else {
775                ErrorKind::PostgresError(string.clone())
776            }
777        }
778        _ => ErrorKind::Unknown,
779    }
780}
781
782#[allow(unused)]
783pub struct MissingMaterialized {
784    pub name: String,
785}
786
787use bytes::BytesMut;
788use tokio_postgres::types::{to_sql_checked, IsNull, Type};
789
790use crate::sql_builder::SqlBuilder;
791
792#[derive(Debug)]
793struct UntypedNull;
794
795impl ToSql for UntypedNull {
796    fn to_sql(&self, _ty: &Type, _out: &mut BytesMut) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> { Ok(IsNull::Yes) }
797
798    fn accepts(_ty: &Type) -> bool {
799        true // Accept all types
800    }
801
802    to_sql_checked!();
803}