Skip to main content

ankurah_storage_postgres/
lib.rs

1use std::{
2    collections::{hash_map::DefaultHasher, BTreeMap},
3    hash::{Hash, Hasher},
4    sync::{Arc, RwLock},
5    time::Duration,
6};
7
8use ankurah_core::{
9    error::{MutationError, RetrievalError, StateError},
10    property::backend::backend_from_string,
11    storage::{StorageCollection, StorageEngine},
12};
13use ankurah_proto::{Attestation, AttestationSet, Attested, EntityState, EventId, OperationSet, State, StateBuffers};
14
15use futures_util::{pin_mut, TryStreamExt};
16
17pub mod sql_builder;
18pub mod value;
19
20use value::PGValue;
21
22use ankurah_proto::{Clock, CollectionId, EntityId, Event};
23use async_trait::async_trait;
24use bb8_postgres::{tokio_postgres::NoTls, PostgresConnectionManager};
25use tokio_postgres::{error::SqlState, types::ToSql};
26use tracing::{debug, error, info, warn};
27
28/// Default connection pool size for `Postgres::open()`.
29/// Production applications should configure their own pool via `Postgres::new()`.
30pub const DEFAULT_POOL_SIZE: u32 = 15;
31
32/// Default connection timeout in seconds
33pub const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
34
35pub struct Postgres {
36    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
37}
38
39impl Postgres {
40    pub fn new(pool: bb8::Pool<PostgresConnectionManager<NoTls>>) -> anyhow::Result<Self> { Ok(Self { pool }) }
41
42    pub async fn open(uri: &str) -> anyhow::Result<Self> {
43        let manager = PostgresConnectionManager::new_from_stringlike(uri, NoTls)?;
44        let pool = bb8::Pool::builder()
45            .max_size(DEFAULT_POOL_SIZE)
46            .connection_timeout(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS))
47            .build(manager)
48            .await?;
49        Self::new(pool)
50    }
51
52    // TODO: newtype this to `BucketName(&str)` with a constructor that
53    // only accepts a subset of characters.
54    pub fn sane_name(collection: &str) -> bool {
55        for char in collection.chars() {
56            match char {
57                char if char.is_alphanumeric() => {}
58                char if char.is_numeric() => {}
59                '_' | '.' | ':' => {}
60                _ => return false,
61            }
62        }
63
64        true
65    }
66}
67
68/// Compute advisory lock key from a string identifier
69fn advisory_lock_key(identifier: &str) -> i64 {
70    let mut hasher = DefaultHasher::new();
71    identifier.hash(&mut hasher);
72    hasher.finish() as i64
73}
74
75/// Acquire a PostgreSQL advisory lock for DDL operations on a collection
76async fn acquire_ddl_lock(client: &tokio_postgres::Client, collection_id: &str) -> Result<i64, StateError> {
77    let lock_key = advisory_lock_key(&format!("ankurah_ddl:{}", collection_id));
78    debug!("Acquiring advisory lock {} for collection {}", lock_key, collection_id);
79    client.execute("SELECT pg_advisory_lock($1)", &[&lock_key]).await.map_err(|err| {
80        error!("Failed to acquire advisory lock for {}: {:?}", collection_id, err);
81        StateError::DDLError(Box::new(err))
82    })?;
83    Ok(lock_key)
84}
85
86/// Release a PostgreSQL advisory lock
87async fn release_ddl_lock(client: &tokio_postgres::Client, lock_key: i64) -> Result<(), StateError> {
88    debug!("Releasing advisory lock {}", lock_key);
89    client.execute("SELECT pg_advisory_unlock($1)", &[&lock_key]).await.map_err(|err| {
90        error!("Failed to release advisory lock {}: {:?}", lock_key, err);
91        StateError::DDLError(Box::new(err))
92    })?;
93    Ok(())
94}
95
96#[async_trait]
97impl StorageEngine for Postgres {
98    type Value = PGValue;
99
100    async fn collection(&self, collection_id: &CollectionId) -> Result<std::sync::Arc<dyn StorageCollection>, RetrievalError> {
101        if !Postgres::sane_name(collection_id.as_str()) {
102            return Err(RetrievalError::InvalidBucketName);
103        }
104
105        let mut client = self.pool.get().await.map_err(RetrievalError::storage)?;
106
107        // get the current schema from the database
108        let schema = client.query_one("SELECT current_database()", &[]).await.map_err(RetrievalError::storage)?;
109        let schema = schema.get("current_database");
110
111        let bucket = PostgresBucket {
112            pool: self.pool.clone(),
113            schema,
114            collection_id: collection_id.clone(),
115            columns: Arc::new(RwLock::new(Vec::new())),
116            #[cfg(debug_assertions)]
117            last_spilled_predicate: Arc::new(RwLock::new(None)),
118        };
119
120        // Acquire advisory lock to serialize DDL operations for this collection
121        let lock_key = acquire_ddl_lock(&client, collection_id.as_str()).await?;
122
123        // Create tables if they don't exist (protected by advisory lock)
124        let result = async {
125            bucket.create_state_table(&mut client).await?;
126            bucket.create_event_table(&mut client).await?;
127            bucket.rebuild_columns_cache(&mut client).await?;
128            Ok::<_, StateError>(())
129        }
130        .await;
131
132        // Always release the lock, even if DDL failed
133        release_ddl_lock(&client, lock_key).await?;
134
135        result?;
136        Ok(Arc::new(bucket))
137    }
138
139    async fn delete_all_collections(&self) -> Result<bool, MutationError> {
140        let mut client = self.pool.get().await.map_err(|err| MutationError::General(Box::new(err)))?;
141
142        // Get all tables in the public schema
143        let query = r#"
144            SELECT table_name 
145            FROM information_schema.tables 
146            WHERE table_schema = 'public'
147        "#;
148
149        let rows = client.query(query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
150        if rows.is_empty() {
151            return Ok(false);
152        }
153
154        // Start a transaction to drop all tables atomically
155        let transaction = client.transaction().await.map_err(|err| MutationError::General(Box::new(err)))?;
156
157        // Drop each table
158        for row in rows {
159            let table_name: String = row.get("table_name");
160            let drop_query = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
161            transaction.execute(&drop_query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
162        }
163
164        // Commit the transaction
165        transaction.commit().await.map_err(|err| MutationError::General(Box::new(err)))?;
166
167        Ok(true)
168    }
169}
170
171#[derive(Clone, Debug)]
172pub struct PostgresColumn {
173    pub name: String,
174    pub is_nullable: bool,
175    pub data_type: String,
176}
177
178pub struct PostgresBucket {
179    pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
180    collection_id: CollectionId,
181    schema: String,
182    columns: Arc<RwLock<Vec<PostgresColumn>>>,
183    /// Tracks the last predicate that spilled to post-filtering (debug builds only)
184    #[cfg(debug_assertions)]
185    last_spilled_predicate: Arc<RwLock<Option<ankql::ast::Predicate>>>,
186}
187
188impl PostgresBucket {
189    fn state_table(&self) -> String { self.collection_id.as_str().to_string() }
190
191    pub fn event_table(&self) -> String { format!("{}_event", self.collection_id.as_str()) }
192
193    /// Returns the last predicate that spilled to post-filtering (debug builds only).
194    ///
195    /// Use this in tests to verify queries are fully pushed down to PostgreSQL:
196    /// ```rust,ignore
197    /// let spilled = bucket.last_spilled_predicate();
198    /// assert!(spilled.is_none(), "Expected full pushdown, but got spill: {:?}", spilled);
199    /// ```
200    #[cfg(debug_assertions)]
201    pub fn last_spilled_predicate(&self) -> Option<ankql::ast::Predicate> { self.last_spilled_predicate.read().unwrap().clone() }
202
203    /// Rebuild the cache of columns in the table.
204    pub async fn rebuild_columns_cache(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
205        debug!("PostgresBucket({}).rebuild_columns_cache", self.collection_id);
206        let column_query =
207            r#"SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_catalog = $1 AND table_name = $2;"#
208                .to_string();
209        let mut new_columns = Vec::new();
210        debug!("Querying existing columns: {:?}, [{:?}, {:?}]", column_query, &self.schema, &self.collection_id.as_str());
211        let rows = client
212            .query(&column_query, &[&self.schema, &self.collection_id.as_str()])
213            .await
214            .map_err(|err| StateError::DDLError(Box::new(err)))?;
215        for row in rows {
216            let is_nullable: String = row.get("is_nullable");
217            new_columns.push(PostgresColumn {
218                name: row.get("column_name"),
219                is_nullable: is_nullable.eq("YES"),
220                data_type: row.get("data_type"),
221            })
222        }
223
224        let mut columns = self.columns.write().unwrap();
225        *columns = new_columns;
226        drop(columns);
227
228        Ok(())
229    }
230
231    pub fn existing_columns(&self) -> Vec<String> {
232        let columns = self.columns.read().unwrap();
233        columns.iter().map(|column| column.name.clone()).collect()
234    }
235
236    pub fn column(&self, column_name: &String) -> Option<PostgresColumn> {
237        let columns = self.columns.read().unwrap();
238        columns.iter().find(|column| column.name == *column_name).cloned()
239    }
240
241    pub fn has_column(&self, column_name: &String) -> bool { self.column(column_name).is_some() }
242
243    pub async fn create_event_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
244        let create_query = format!(
245            r#"CREATE TABLE IF NOT EXISTS "{}"(
246                "id" character(43) PRIMARY KEY,
247                "entity_id" character(22),
248                "operations" bytea,
249                "parent" character(43)[],
250                "attestations" bytea
251            )"#,
252            self.event_table()
253        );
254
255        debug!("{create_query}");
256        client.execute(&create_query, &[]).await.map_err(|e| StateError::DDLError(Box::new(e)))?;
257        Ok(())
258    }
259
260    pub async fn create_state_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
261        let create_query = format!(
262            r#"CREATE TABLE IF NOT EXISTS "{}"(
263                "id" character(22) PRIMARY KEY,
264                "state_buffer" BYTEA,
265                "head" character(43)[],
266                "attestations" BYTEA[]
267            )"#,
268            self.state_table()
269        );
270
271        debug!("{create_query}");
272        match client.execute(&create_query, &[]).await {
273            Ok(_) => Ok(()),
274            Err(err) => {
275                // Log full error details for debugging
276                if let Some(db_err) = err.as_db_error() {
277                    error!("PostgresBucket({}).create_state_table error: {} (code: {:?})", self.collection_id, db_err, db_err.code());
278                } else {
279                    error!("PostgresBucket({}).create_state_table error: {:?}", self.collection_id, err);
280                }
281                Err(StateError::DDLError(Box::new(err)))
282            }
283        }
284    }
285
286    pub async fn add_missing_columns(
287        &self,
288        client: &mut tokio_postgres::Client,
289        missing: Vec<(String, &'static str)>, // column name, datatype
290    ) -> Result<(), StateError> {
291        if missing.is_empty() {
292            return Ok(());
293        }
294
295        // Acquire advisory lock to serialize DDL operations for this collection
296        let lock_key = acquire_ddl_lock(client, self.collection_id.as_str()).await?;
297
298        let result = async {
299            // Re-check columns after acquiring lock (another session may have added them)
300            self.rebuild_columns_cache(client).await?;
301
302            for (column, datatype) in missing {
303                if Postgres::sane_name(&column) && !self.has_column(&column) {
304                    let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, self.state_table(), column, datatype);
305                    info!("PostgresBucket({}).add_missing_columns: {}", self.collection_id, alter_query);
306                    match client.execute(&alter_query, &[]).await {
307                        Ok(_) => {}
308                        Err(err) => {
309                            // Log full error details for debugging
310                            if let Some(db_err) = err.as_db_error() {
311                                warn!(
312                                    "Error adding column {} to table {}: {} (code: {:?})",
313                                    column,
314                                    self.state_table(),
315                                    db_err,
316                                    db_err.code()
317                                );
318                            } else {
319                                warn!("Error adding column {} to table {}: {:?}", column, self.state_table(), err);
320                            }
321                            self.rebuild_columns_cache(client).await?;
322                            return Err(StateError::DDLError(Box::new(err)));
323                        }
324                    }
325                }
326            }
327
328            self.rebuild_columns_cache(client).await?;
329            Ok(())
330        }
331        .await;
332
333        // Always release the lock
334        release_ddl_lock(client, lock_key).await?;
335
336        result
337    }
338}
339
340#[async_trait]
341impl StorageCollection for PostgresBucket {
342    async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
343        let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
344        let attestations: Vec<Vec<u8>> = state.attestations.iter().map(bincode::serialize).collect::<Result<Vec<_>, _>>()?;
345        let id = state.payload.entity_id;
346
347        // Ensure head is not empty for new records
348        if state.payload.state.head.is_empty() {
349            warn!("Warning: Empty head detected for entity {}", id);
350        }
351
352        let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
353
354        let mut columns: Vec<String> = vec!["id".to_owned(), "state_buffer".to_owned(), "head".to_owned(), "attestations".to_owned()];
355        let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
356        params.push(&id);
357        params.push(&state_buffers);
358        params.push(&state.payload.state.head);
359        params.push(&attestations);
360
361        let mut materialized: Vec<(String, Option<PGValue>)> = Vec::new();
362        let mut seen_properties = std::collections::HashSet::new();
363
364        // Process property values directly from state buffers
365        for (name, state_buffer) in state.payload.state.state_buffers.iter() {
366            let backend = backend_from_string(name, Some(state_buffer))?;
367            for (column, value) in backend.property_values() {
368                if !seen_properties.insert(column.clone()) {
369                    // Skip if property already seen in another backend
370                    // TODO: this should cause all (or subsequent?) fields with the same name
371                    // to be suffixed with the property id when we have property ids
372                    // requires some thought (and field metadata) on how to do this right
373                    continue;
374                }
375
376                let pg_value: Option<PGValue> = value.map(|value| value.into());
377                if !self.has_column(&column) {
378                    // We don't have the column yet and we know the type.
379                    if let Some(ref pg_value) = pg_value {
380                        self.add_missing_columns(&mut client, vec![(column.clone(), pg_value.postgres_type())]).await?;
381                    } else {
382                        // The column doesn't exist yet and we don't have a value.
383                        // This means the entire column is already null/none so we
384                        // don't need to set anything.
385                        continue;
386                    }
387                }
388
389                materialized.push((column.clone(), pg_value));
390            }
391        }
392
393        for (name, parameter) in &materialized {
394            columns.push(name.clone());
395
396            match &parameter {
397                Some(value) => match value {
398                    PGValue::CharacterVarying(string) => params.push(string),
399                    PGValue::SmallInt(number) => params.push(number),
400                    PGValue::Integer(number) => params.push(number),
401                    PGValue::BigInt(number) => params.push(number),
402                    PGValue::DoublePrecision(float) => params.push(float),
403                    PGValue::Bytea(bytes) => params.push(bytes),
404                    PGValue::Boolean(bool) => params.push(bool),
405                    PGValue::Jsonb(json_val) => params.push(json_val),
406                },
407                None => params.push(&UntypedNull),
408            }
409        }
410        let columns_str = columns.iter().map(|name| format!("\"{}\"", name)).collect::<Vec<String>>().join(", ");
411        let values_str = params.iter().enumerate().map(|(index, _)| format!("${}", index + 1)).collect::<Vec<String>>().join(", ");
412        let columns_update_str = columns
413            .iter()
414            .enumerate()
415            .skip(1) // Skip "id"
416            .map(|(index, name)| format!("\"{}\" = ${}", name, index + 1))
417            .collect::<Vec<String>>()
418            .join(", ");
419
420        // be careful with sql injection via bucket name
421        let query = format!(
422            r#"WITH old_state AS (
423                SELECT "head" FROM "{0}" WHERE "id" = $1
424            )
425            INSERT INTO "{0}"({1}) VALUES({2})
426            ON CONFLICT("id") DO UPDATE SET {3}
427            RETURNING (SELECT "head" FROM old_state) as old_head"#,
428            self.state_table(),
429            columns_str,
430            values_str,
431            columns_update_str
432        );
433
434        debug!("PostgresBucket({}).set_state: {}", self.collection_id, query);
435        let mut created_table = false;
436        let row = loop {
437            match client.query_one(&query, params.as_slice()).await {
438                Ok(row) => break row,
439                Err(err) => {
440                    let kind = error_kind(&err);
441                    if let ErrorKind::UndefinedTable { table } = kind {
442                        if table == self.state_table() && !created_table {
443                            self.create_state_table(&mut client).await?;
444                            created_table = true;
445                            continue; // retry exactly once
446                        }
447                    }
448                    return Err(StateError::DDLError(Box::new(err)).into());
449                }
450            }
451        };
452
453        // If this is a new entity (no old_head), or if the heads are different, return true
454        let old_head: Option<Clock> = row.get("old_head");
455        let changed = match old_head {
456            None => true, // New entity
457            Some(old_head) => old_head != state.payload.state.head,
458        };
459
460        debug!("PostgresBucket({}).set_state: Changed: {}", self.collection_id, changed);
461        Ok(changed)
462    }
463
464    async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
465        // be careful with sql injection via bucket name
466        let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = $1"#, self.state_table());
467
468        let mut client = match self.pool.get().await {
469            Ok(client) => client,
470            Err(err) => {
471                return Err(RetrievalError::StorageError(err.into()));
472            }
473        };
474
475        debug!("PostgresBucket({}).get_state: {}", self.collection_id, query);
476        let rows = match client.query(&query, &[&id]).await {
477            Ok(rows) => rows,
478            Err(err) => {
479                let kind = error_kind(&err);
480                if let ErrorKind::UndefinedTable { table } = kind {
481                    if table == self.state_table() {
482                        self.create_state_table(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
483                        return Err(RetrievalError::EntityNotFound(id));
484                    }
485                }
486                return Err(RetrievalError::StorageError(err.into()));
487            }
488        };
489
490        let row = match rows.into_iter().next() {
491            Some(row) => row,
492            None => return Err(RetrievalError::EntityNotFound(id)),
493        };
494
495        debug!("PostgresBucket({}).get_state: Row: {:?}", self.collection_id, row);
496        let row_id: EntityId = row.try_get("id").map_err(RetrievalError::storage)?;
497        assert_eq!(row_id, id);
498
499        let serialized_buffers: Vec<u8> = row.try_get("state_buffer").map_err(RetrievalError::storage)?;
500        let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&serialized_buffers).map_err(RetrievalError::storage)?;
501        let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
502        let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
503        let attestations = attestation_bytes
504            .into_iter()
505            .map(|bytes| bincode::deserialize(&bytes))
506            .collect::<Result<Vec<Attestation>, _>>()
507            .map_err(RetrievalError::storage)?;
508
509        Ok(Attested {
510            payload: EntityState {
511                entity_id: id,
512                collection: self.collection_id.clone(),
513                state: State { state_buffers: StateBuffers(state_buffers), head },
514            },
515            attestations: AttestationSet(attestations),
516        })
517    }
518
519    async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
520        debug!("fetch_states: {:?}", selection);
521        let mut client = self.pool.get().await.map_err(|err| RetrievalError::StorageError(Box::new(err)))?;
522
523        // Pre-filter selection based on cached schema to avoid undefined column errors.
524        // If we see columns not in our cache, refresh it first (they might have been added).
525        // TODO: Once property metadata is in the system catalog, we can create missing columns
526        // on-demand here instead of refreshing the cache each time we see unknown columns.
527        let referenced = selection.referenced_columns();
528        let cached = self.existing_columns();
529        let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
530
531        // Refresh cache if we see columns we haven't seen before
532        if !unknown_to_cache.is_empty() {
533            debug!("PostgresBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
534            self.rebuild_columns_cache(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
535        }
536
537        // Now check with (possibly refreshed) cache - columns still missing truly don't exist
538        let existing = self.existing_columns();
539        let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
540
541        let effective_selection = if missing.is_empty() {
542            selection.clone()
543        } else {
544            debug!("PostgresBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
545            selection.assume_null(&missing)
546        };
547
548        // Split predicate into parts we can pushdown to PostgreSQL vs post-filter in Rust
549        let split = sql_builder::split_predicate_for_postgres(&effective_selection.predicate);
550        let needs_post_filter = split.needs_post_filter();
551        let remaining_predicate = split.remaining_predicate; // Cache before moving sql_predicate
552        debug!(
553            "PostgresBucket({}).fetch_states: SQL predicate: {:?}, remaining: {:?}, needs_post_filter: {}",
554            self.collection_id, split.sql_predicate, remaining_predicate, needs_post_filter
555        );
556
557        // Track spilled predicate for test assertions (debug builds only)
558        #[cfg(debug_assertions)]
559        {
560            let mut spilled = self.last_spilled_predicate.write().unwrap();
561            *spilled = if needs_post_filter { Some(remaining_predicate.clone()) } else { None };
562        }
563
564        // Track spilled predicate for test assertions (debug builds only)
565        #[cfg(debug_assertions)]
566        {
567            let spilled = if needs_post_filter { Some(remaining_predicate.clone()) } else { None };
568            *self.last_spilled_predicate.write().unwrap() = spilled;
569        }
570
571        // Build SQL with only the pushdown-capable predicate
572        let sql_selection = ankql::ast::Selection {
573            predicate: split.sql_predicate,
574            order_by: effective_selection.order_by.clone(),
575            limit: if needs_post_filter {
576                None // Can't limit in SQL if we need to post-filter (would drop valid results)
577            } else {
578                effective_selection.limit
579            },
580        };
581
582        let mut results = Vec::new();
583        let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
584        builder.table_name(self.state_table());
585        builder.selection(&sql_selection)?;
586
587        let (sql, args) = builder.build()?;
588        debug!("PostgresBucket({}).fetch_states: SQL: {} with args: {:?}", self.collection_id, sql, args);
589
590        let stream = match client.query_raw(&sql, args).await {
591            Ok(stream) => stream,
592            Err(err) => {
593                let kind = error_kind(&err);
594                if let ErrorKind::UndefinedTable { table } = kind {
595                    if table == self.state_table() {
596                        // Table doesn't exist yet, return empty results
597                        return Ok(Vec::new());
598                    }
599                }
600                return Err(RetrievalError::StorageError(err.into()));
601            }
602        };
603        pin_mut!(stream);
604
605        while let Some(row) = stream.try_next().await.map_err(RetrievalError::storage)? {
606            let id: EntityId = row.try_get(0).map_err(RetrievalError::storage)?;
607            let state_buffer: Vec<u8> = row.try_get(1).map_err(RetrievalError::storage)?;
608            let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(RetrievalError::storage)?;
609            let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
610            let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
611            let attestations = attestation_bytes
612                .into_iter()
613                .map(|bytes| bincode::deserialize(&bytes))
614                .collect::<Result<Vec<Attestation>, _>>()
615                .map_err(RetrievalError::storage)?;
616
617            results.push(Attested {
618                payload: EntityState {
619                    entity_id: id,
620                    collection: self.collection_id.clone(),
621                    state: State { state_buffers: StateBuffers(state_buffers), head },
622                },
623                attestations: AttestationSet(attestations),
624            });
625        }
626
627        // Post-filter results if we have remaining predicate that couldn't be pushed down
628        let results = if needs_post_filter {
629            debug!(
630                "PostgresBucket({}).fetch_states: Post-filtering {} results with remaining predicate",
631                self.collection_id,
632                results.len()
633            );
634            let filtered = post_filter_states(&results, &remaining_predicate, &self.collection_id);
635
636            // Apply limit after post-filter if needed
637            if let Some(limit) = effective_selection.limit {
638                filtered.into_iter().take(limit as usize).collect()
639            } else {
640                filtered
641            }
642        } else {
643            results
644        };
645
646        Ok(results)
647    }
648
649    async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
650        let operations = bincode::serialize(&entity_event.payload.operations)?;
651        let attestations = bincode::serialize(&entity_event.attestations)?;
652
653        let query = format!(
654            r#"INSERT INTO "{0}"("id", "entity_id", "operations", "parent", "attestations") VALUES($1, $2, $3, $4, $5)
655               ON CONFLICT ("id") DO NOTHING"#,
656            self.event_table(),
657        );
658
659        let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
660        debug!("PostgresBucket({}).add_event: {}", self.collection_id, query);
661        let mut created_table = false;
662        let affected = loop {
663            match client
664                .execute(
665                    &query,
666                    &[
667                        &entity_event.payload.id(),
668                        &entity_event.payload.entity_id,
669                        &operations,
670                        &entity_event.payload.parent,
671                        &attestations,
672                    ],
673                )
674                .await
675            {
676                Ok(affected) => break affected,
677                Err(err) => {
678                    let kind = error_kind(&err);
679                    if let ErrorKind::UndefinedTable { table } = kind {
680                        if table == self.event_table() && !created_table {
681                            self.create_event_table(&mut client).await?;
682                            created_table = true;
683                            continue; // retry exactly once
684                        }
685                    }
686                    error!("PostgresBucket({}).add_event: Error: {:?}", self.collection_id, err);
687                    return Err(StateError::DMLError(Box::new(err)).into());
688                }
689            }
690        };
691
692        Ok(affected > 0)
693    }
694
695    async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
696        if event_ids.is_empty() {
697            return Ok(Vec::new());
698        }
699
700        let query = format!(
701            r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{0}" WHERE "id" = ANY($1)"#,
702            self.event_table(),
703        );
704
705        let client = self.pool.get().await.map_err(RetrievalError::storage)?;
706        let rows = match client.query(&query, &[&event_ids]).await {
707            Ok(rows) => rows,
708            Err(err) => {
709                let kind = error_kind(&err);
710                match kind {
711                    ErrorKind::UndefinedTable { table } if table == self.event_table() => return Ok(Vec::new()),
712                    _ => return Err(RetrievalError::storage(err)),
713                }
714            }
715        };
716
717        let mut events = Vec::new();
718        for row in rows {
719            let entity_id: EntityId = row.try_get("entity_id").map_err(RetrievalError::storage)?;
720            let operations: OperationSet = row.try_get("operations").map_err(RetrievalError::storage)?;
721            let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
722            let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
723            let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
724
725            let event = Attested {
726                payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
727                attestations: AttestationSet(attestations),
728            };
729            events.push(event);
730        }
731        Ok(events)
732    }
733
734    async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, ankurah_core::error::RetrievalError> {
735        let query =
736            format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{0}" WHERE "entity_id" = $1"#, self.event_table(),);
737
738        let client = self.pool.get().await.map_err(RetrievalError::storage)?;
739        debug!("PostgresBucket({}).get_events: {}", self.collection_id, query);
740        let rows = match client.query(&query, &[&entity_id]).await {
741            Ok(rows) => rows,
742            Err(err) => {
743                let kind = error_kind(&err);
744                if let ErrorKind::UndefinedTable { table } = kind {
745                    if table == self.event_table() {
746                        return Ok(Vec::new());
747                    }
748                }
749
750                return Err(RetrievalError::storage(err));
751            }
752        };
753
754        let mut events = Vec::new();
755        for row in rows {
756            // let event_id: EventId = row.try_get("id").map_err(|err| RetrievalError::storage(err))?;
757            let operations_binary: Vec<u8> = row.try_get("operations").map_err(RetrievalError::storage)?;
758            let operations = bincode::deserialize(&operations_binary).map_err(RetrievalError::storage)?;
759            let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
760            let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
761            let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
762
763            events.push(Attested {
764                payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
765                attestations: AttestationSet(attestations),
766            });
767        }
768
769        Ok(events)
770    }
771}
772
773// Some hacky shit because rust-postgres doesn't let us ask for the error kind
774// TODO: remove this when https://github.com/sfackler/rust-postgres/pull/1185
775//       gets merged
776#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
777pub enum ErrorKind {
778    RowCount,
779    UndefinedTable { table: String },
780    UndefinedColumn { table: Option<String>, column: String },
781    Unknown,
782    PostgresError(String),
783}
784
785pub fn error_kind(err: &tokio_postgres::Error) -> ErrorKind {
786    let string = err.as_db_error().map(|e| e.message()).unwrap_or_default().trim().to_owned();
787    let _db_error = err.as_db_error();
788    let sql_code = err.code().cloned();
789
790    // Check the error's Display string for RowCount errors (client-side, not db error)
791    let err_string = err.to_string();
792    if err_string.contains("query returned an unexpected number of rows") || string == "query returned an unexpected number of rows" {
793        return ErrorKind::RowCount;
794    }
795
796    // Useful for adding new errors
797    // error!("postgres error: {:?}", err);
798    // error!("db_err: {:?}", err.as_db_error());
799    // error!("sql_code: {:?}", err.code());
800    // error!("err: {:?}", err);
801    // error!("err: {:?}", err.to_string());
802    debug!("postgres error: {:?}", err);
803
804    let quote_indices = |s: &str| {
805        let mut quotes = Vec::new();
806        for (index, char) in s.char_indices() {
807            if char == '"' {
808                quotes.push(index)
809            }
810        }
811        quotes
812    };
813
814    match sql_code {
815        Some(SqlState::UNDEFINED_TABLE) => {
816            // relation "album" does not exist
817            let quotes = quote_indices(&string);
818            if quotes.len() >= 2 {
819                let table = &string[quotes[0] + 1..quotes[1]];
820                ErrorKind::UndefinedTable { table: table.to_owned() }
821            } else {
822                ErrorKind::PostgresError(string.clone())
823            }
824        }
825        Some(SqlState::UNDEFINED_COLUMN) => {
826            // Handle both formats:
827            // "column "name" of relation "album" does not exist"
828            // "column "status" does not exist"
829            let quotes = quote_indices(&string);
830            if quotes.len() >= 2 {
831                let column = string[quotes[0] + 1..quotes[1]].to_owned();
832
833                let table = if quotes.len() >= 4 {
834                    // Full format with table name
835                    Some(string[quotes[2] + 1..quotes[3]].to_owned())
836                } else {
837                    // Short format without table name
838                    None
839                };
840
841                ErrorKind::UndefinedColumn { table, column }
842            } else {
843                ErrorKind::PostgresError(string.clone())
844            }
845        }
846        _ => ErrorKind::Unknown,
847    }
848}
849
850#[allow(unused)]
851pub struct MissingMaterialized {
852    pub name: String,
853}
854
855use bytes::BytesMut;
856use tokio_postgres::types::{to_sql_checked, IsNull, Type};
857
858use crate::sql_builder::SqlBuilder;
859
860/// Post-filter EntityStates using a predicate that couldn't be pushed to SQL.
861///
862/// This is the escape hatch for predicates that PostgreSQL can't handle natively,
863/// such as complex JSON traversals or future features like Ref traversal.
864fn post_filter_states(
865    states: &[Attested<EntityState>],
866    predicate: &ankql::ast::Predicate,
867    collection_id: &CollectionId,
868) -> Vec<Attested<EntityState>> {
869    use ankurah_core::entity::TemporaryEntity;
870    use ankurah_core::selection::filter::evaluate_predicate;
871
872    states
873        .iter()
874        .filter(|attested| {
875            // Create a TemporaryEntity for filtering (implements Filterable)
876            match TemporaryEntity::new(attested.payload.entity_id, collection_id.clone(), &attested.payload.state) {
877                Ok(temp_entity) => {
878                    // Evaluate the predicate
879                    match evaluate_predicate(&temp_entity, predicate) {
880                        Ok(true) => true,
881                        Ok(false) => false,
882                        Err(e) => {
883                            warn!("Post-filter evaluation error for entity {}: {}", attested.payload.entity_id, e);
884                            false // Exclude entities that fail evaluation
885                        }
886                    }
887                }
888                Err(e) => {
889                    warn!("Failed to create TemporaryEntity for post-filtering {}: {}", attested.payload.entity_id, e);
890                    false // Exclude entities we can't evaluate
891                }
892            }
893        })
894        .cloned()
895        .collect()
896}
897
898#[derive(Debug)]
899struct UntypedNull;
900
901impl ToSql for UntypedNull {
902    fn to_sql(&self, _ty: &Type, _out: &mut BytesMut) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> { Ok(IsNull::Yes) }
903
904    fn accepts(_ty: &Type) -> bool {
905        true // Accept all types
906    }
907
908    to_sql_checked!();
909}