1use std::{
2 collections::{hash_map::DefaultHasher, BTreeMap},
3 hash::{Hash, Hasher},
4 sync::{Arc, RwLock},
5 time::Duration,
6};
7
8use ankurah_core::{
9 error::{MutationError, RetrievalError, StateError},
10 property::backend::backend_from_string,
11 storage::{StorageCollection, StorageEngine},
12};
13use ankurah_proto::{Attestation, AttestationSet, Attested, EntityState, EventId, OperationSet, State, StateBuffers};
14
15use futures_util::{pin_mut, TryStreamExt};
16
17pub mod sql_builder;
18pub mod value;
19
20use value::PGValue;
21
22use ankurah_proto::{Clock, CollectionId, EntityId, Event};
23use async_trait::async_trait;
24use bb8_postgres::{tokio_postgres::NoTls, PostgresConnectionManager};
25use tokio_postgres::{error::SqlState, types::ToSql};
26use tracing::{debug, error, info, warn};
27
28pub const DEFAULT_POOL_SIZE: u32 = 15;
31
32pub const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 30;
34
35pub struct Postgres {
36 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
37}
38
39impl Postgres {
40 pub fn new(pool: bb8::Pool<PostgresConnectionManager<NoTls>>) -> anyhow::Result<Self> { Ok(Self { pool }) }
41
42 pub async fn open(uri: &str) -> anyhow::Result<Self> {
43 let manager = PostgresConnectionManager::new_from_stringlike(uri, NoTls)?;
44 let pool = bb8::Pool::builder()
45 .max_size(DEFAULT_POOL_SIZE)
46 .connection_timeout(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS))
47 .build(manager)
48 .await?;
49 Self::new(pool)
50 }
51
52 pub fn sane_name(collection: &str) -> bool {
55 for char in collection.chars() {
56 match char {
57 char if char.is_alphanumeric() => {}
58 char if char.is_numeric() => {}
59 '_' | '.' | ':' => {}
60 _ => return false,
61 }
62 }
63
64 true
65 }
66}
67
68fn advisory_lock_key(identifier: &str) -> i64 {
70 let mut hasher = DefaultHasher::new();
71 identifier.hash(&mut hasher);
72 hasher.finish() as i64
73}
74
75async fn acquire_ddl_lock(client: &tokio_postgres::Client, collection_id: &str) -> Result<i64, StateError> {
77 let lock_key = advisory_lock_key(&format!("ankurah_ddl:{}", collection_id));
78 debug!("Acquiring advisory lock {} for collection {}", lock_key, collection_id);
79 client.execute("SELECT pg_advisory_lock($1)", &[&lock_key]).await.map_err(|err| {
80 error!("Failed to acquire advisory lock for {}: {:?}", collection_id, err);
81 StateError::DDLError(Box::new(err))
82 })?;
83 Ok(lock_key)
84}
85
86async fn release_ddl_lock(client: &tokio_postgres::Client, lock_key: i64) -> Result<(), StateError> {
88 debug!("Releasing advisory lock {}", lock_key);
89 client.execute("SELECT pg_advisory_unlock($1)", &[&lock_key]).await.map_err(|err| {
90 error!("Failed to release advisory lock {}: {:?}", lock_key, err);
91 StateError::DDLError(Box::new(err))
92 })?;
93 Ok(())
94}
95
96#[async_trait]
97impl StorageEngine for Postgres {
98 type Value = PGValue;
99
100 async fn collection(&self, collection_id: &CollectionId) -> Result<std::sync::Arc<dyn StorageCollection>, RetrievalError> {
101 if !Postgres::sane_name(collection_id.as_str()) {
102 return Err(RetrievalError::InvalidBucketName);
103 }
104
105 let mut client = self.pool.get().await.map_err(RetrievalError::storage)?;
106
107 let schema = client.query_one("SELECT current_database()", &[]).await.map_err(RetrievalError::storage)?;
109 let schema = schema.get("current_database");
110
111 let bucket = PostgresBucket {
112 pool: self.pool.clone(),
113 schema,
114 collection_id: collection_id.clone(),
115 columns: Arc::new(RwLock::new(Vec::new())),
116 #[cfg(debug_assertions)]
117 last_spilled_predicate: Arc::new(RwLock::new(None)),
118 };
119
120 let lock_key = acquire_ddl_lock(&client, collection_id.as_str()).await?;
122
123 let result = async {
125 bucket.create_state_table(&mut client).await?;
126 bucket.create_event_table(&mut client).await?;
127 bucket.rebuild_columns_cache(&mut client).await?;
128 Ok::<_, StateError>(())
129 }
130 .await;
131
132 release_ddl_lock(&client, lock_key).await?;
134
135 result?;
136 Ok(Arc::new(bucket))
137 }
138
139 async fn delete_all_collections(&self) -> Result<bool, MutationError> {
140 let mut client = self.pool.get().await.map_err(|err| MutationError::General(Box::new(err)))?;
141
142 let query = r#"
144 SELECT table_name
145 FROM information_schema.tables
146 WHERE table_schema = 'public'
147 "#;
148
149 let rows = client.query(query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
150 if rows.is_empty() {
151 return Ok(false);
152 }
153
154 let transaction = client.transaction().await.map_err(|err| MutationError::General(Box::new(err)))?;
156
157 for row in rows {
159 let table_name: String = row.get("table_name");
160 let drop_query = format!(r#"DROP TABLE IF EXISTS "{}""#, table_name);
161 transaction.execute(&drop_query, &[]).await.map_err(|err| MutationError::General(Box::new(err)))?;
162 }
163
164 transaction.commit().await.map_err(|err| MutationError::General(Box::new(err)))?;
166
167 Ok(true)
168 }
169}
170
171#[derive(Clone, Debug)]
172pub struct PostgresColumn {
173 pub name: String,
174 pub is_nullable: bool,
175 pub data_type: String,
176}
177
178pub struct PostgresBucket {
179 pool: bb8::Pool<PostgresConnectionManager<NoTls>>,
180 collection_id: CollectionId,
181 schema: String,
182 columns: Arc<RwLock<Vec<PostgresColumn>>>,
183 #[cfg(debug_assertions)]
185 last_spilled_predicate: Arc<RwLock<Option<ankql::ast::Predicate>>>,
186}
187
188impl PostgresBucket {
189 fn state_table(&self) -> String { self.collection_id.as_str().to_string() }
190
191 pub fn event_table(&self) -> String { format!("{}_event", self.collection_id.as_str()) }
192
193 #[cfg(debug_assertions)]
201 pub fn last_spilled_predicate(&self) -> Option<ankql::ast::Predicate> { self.last_spilled_predicate.read().unwrap().clone() }
202
203 pub async fn rebuild_columns_cache(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
205 debug!("PostgresBucket({}).rebuild_columns_cache", self.collection_id);
206 let column_query =
207 r#"SELECT column_name, is_nullable, data_type FROM information_schema.columns WHERE table_catalog = $1 AND table_name = $2;"#
208 .to_string();
209 let mut new_columns = Vec::new();
210 debug!("Querying existing columns: {:?}, [{:?}, {:?}]", column_query, &self.schema, &self.collection_id.as_str());
211 let rows = client
212 .query(&column_query, &[&self.schema, &self.collection_id.as_str()])
213 .await
214 .map_err(|err| StateError::DDLError(Box::new(err)))?;
215 for row in rows {
216 let is_nullable: String = row.get("is_nullable");
217 new_columns.push(PostgresColumn {
218 name: row.get("column_name"),
219 is_nullable: is_nullable.eq("YES"),
220 data_type: row.get("data_type"),
221 })
222 }
223
224 let mut columns = self.columns.write().unwrap();
225 *columns = new_columns;
226 drop(columns);
227
228 Ok(())
229 }
230
231 pub fn existing_columns(&self) -> Vec<String> {
232 let columns = self.columns.read().unwrap();
233 columns.iter().map(|column| column.name.clone()).collect()
234 }
235
236 pub fn column(&self, column_name: &String) -> Option<PostgresColumn> {
237 let columns = self.columns.read().unwrap();
238 columns.iter().find(|column| column.name == *column_name).cloned()
239 }
240
241 pub fn has_column(&self, column_name: &String) -> bool { self.column(column_name).is_some() }
242
243 pub async fn create_event_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
244 let create_query = format!(
245 r#"CREATE TABLE IF NOT EXISTS "{}"(
246 "id" character(43) PRIMARY KEY,
247 "entity_id" character(22),
248 "operations" bytea,
249 "parent" character(43)[],
250 "attestations" bytea
251 )"#,
252 self.event_table()
253 );
254
255 debug!("{create_query}");
256 client.execute(&create_query, &[]).await.map_err(|e| StateError::DDLError(Box::new(e)))?;
257 Ok(())
258 }
259
260 pub async fn create_state_table(&self, client: &mut tokio_postgres::Client) -> Result<(), StateError> {
261 let create_query = format!(
262 r#"CREATE TABLE IF NOT EXISTS "{}"(
263 "id" character(22) PRIMARY KEY,
264 "state_buffer" BYTEA,
265 "head" character(43)[],
266 "attestations" BYTEA[]
267 )"#,
268 self.state_table()
269 );
270
271 debug!("{create_query}");
272 match client.execute(&create_query, &[]).await {
273 Ok(_) => Ok(()),
274 Err(err) => {
275 if let Some(db_err) = err.as_db_error() {
277 error!("PostgresBucket({}).create_state_table error: {} (code: {:?})", self.collection_id, db_err, db_err.code());
278 } else {
279 error!("PostgresBucket({}).create_state_table error: {:?}", self.collection_id, err);
280 }
281 Err(StateError::DDLError(Box::new(err)))
282 }
283 }
284 }
285
286 pub async fn add_missing_columns(
287 &self,
288 client: &mut tokio_postgres::Client,
289 missing: Vec<(String, &'static str)>, ) -> Result<(), StateError> {
291 if missing.is_empty() {
292 return Ok(());
293 }
294
295 let lock_key = acquire_ddl_lock(client, self.collection_id.as_str()).await?;
297
298 let result = async {
299 self.rebuild_columns_cache(client).await?;
301
302 for (column, datatype) in missing {
303 if Postgres::sane_name(&column) && !self.has_column(&column) {
304 let alter_query = format!(r#"ALTER TABLE "{}" ADD COLUMN "{}" {}"#, self.state_table(), column, datatype);
305 info!("PostgresBucket({}).add_missing_columns: {}", self.collection_id, alter_query);
306 match client.execute(&alter_query, &[]).await {
307 Ok(_) => {}
308 Err(err) => {
309 if let Some(db_err) = err.as_db_error() {
311 warn!(
312 "Error adding column {} to table {}: {} (code: {:?})",
313 column,
314 self.state_table(),
315 db_err,
316 db_err.code()
317 );
318 } else {
319 warn!("Error adding column {} to table {}: {:?}", column, self.state_table(), err);
320 }
321 self.rebuild_columns_cache(client).await?;
322 return Err(StateError::DDLError(Box::new(err)));
323 }
324 }
325 }
326 }
327
328 self.rebuild_columns_cache(client).await?;
329 Ok(())
330 }
331 .await;
332
333 release_ddl_lock(client, lock_key).await?;
335
336 result
337 }
338}
339
340#[async_trait]
341impl StorageCollection for PostgresBucket {
342 async fn set_state(&self, state: Attested<EntityState>) -> Result<bool, MutationError> {
343 let state_buffers = bincode::serialize(&state.payload.state.state_buffers)?;
344 let attestations: Vec<Vec<u8>> = state.attestations.iter().map(bincode::serialize).collect::<Result<Vec<_>, _>>()?;
345 let id = state.payload.entity_id;
346
347 if state.payload.state.head.is_empty() {
349 warn!("Warning: Empty head detected for entity {}", id);
350 }
351
352 let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
353
354 let mut columns: Vec<String> = vec!["id".to_owned(), "state_buffer".to_owned(), "head".to_owned(), "attestations".to_owned()];
355 let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
356 params.push(&id);
357 params.push(&state_buffers);
358 params.push(&state.payload.state.head);
359 params.push(&attestations);
360
361 let mut materialized: Vec<(String, Option<PGValue>)> = Vec::new();
362 let mut seen_properties = std::collections::HashSet::new();
363
364 for (name, state_buffer) in state.payload.state.state_buffers.iter() {
366 let backend = backend_from_string(name, Some(state_buffer))?;
367 for (column, value) in backend.property_values() {
368 if !seen_properties.insert(column.clone()) {
369 continue;
374 }
375
376 let pg_value: Option<PGValue> = value.map(|value| value.into());
377 if !self.has_column(&column) {
378 if let Some(ref pg_value) = pg_value {
380 self.add_missing_columns(&mut client, vec![(column.clone(), pg_value.postgres_type())]).await?;
381 } else {
382 continue;
386 }
387 }
388
389 materialized.push((column.clone(), pg_value));
390 }
391 }
392
393 for (name, parameter) in &materialized {
394 columns.push(name.clone());
395
396 match ¶meter {
397 Some(value) => match value {
398 PGValue::CharacterVarying(string) => params.push(string),
399 PGValue::SmallInt(number) => params.push(number),
400 PGValue::Integer(number) => params.push(number),
401 PGValue::BigInt(number) => params.push(number),
402 PGValue::DoublePrecision(float) => params.push(float),
403 PGValue::Bytea(bytes) => params.push(bytes),
404 PGValue::Boolean(bool) => params.push(bool),
405 PGValue::Jsonb(json_val) => params.push(json_val),
406 },
407 None => params.push(&UntypedNull),
408 }
409 }
410 let columns_str = columns.iter().map(|name| format!("\"{}\"", name)).collect::<Vec<String>>().join(", ");
411 let values_str = params.iter().enumerate().map(|(index, _)| format!("${}", index + 1)).collect::<Vec<String>>().join(", ");
412 let columns_update_str = columns
413 .iter()
414 .enumerate()
415 .skip(1) .map(|(index, name)| format!("\"{}\" = ${}", name, index + 1))
417 .collect::<Vec<String>>()
418 .join(", ");
419
420 let query = format!(
422 r#"WITH old_state AS (
423 SELECT "head" FROM "{0}" WHERE "id" = $1
424 )
425 INSERT INTO "{0}"({1}) VALUES({2})
426 ON CONFLICT("id") DO UPDATE SET {3}
427 RETURNING (SELECT "head" FROM old_state) as old_head"#,
428 self.state_table(),
429 columns_str,
430 values_str,
431 columns_update_str
432 );
433
434 debug!("PostgresBucket({}).set_state: {}", self.collection_id, query);
435 let mut created_table = false;
436 let row = loop {
437 match client.query_one(&query, params.as_slice()).await {
438 Ok(row) => break row,
439 Err(err) => {
440 let kind = error_kind(&err);
441 if let ErrorKind::UndefinedTable { table } = kind {
442 if table == self.state_table() && !created_table {
443 self.create_state_table(&mut client).await?;
444 created_table = true;
445 continue; }
447 }
448 return Err(StateError::DDLError(Box::new(err)).into());
449 }
450 }
451 };
452
453 let old_head: Option<Clock> = row.get("old_head");
455 let changed = match old_head {
456 None => true, Some(old_head) => old_head != state.payload.state.head,
458 };
459
460 debug!("PostgresBucket({}).set_state: Changed: {}", self.collection_id, changed);
461 Ok(changed)
462 }
463
464 async fn get_state(&self, id: EntityId) -> Result<Attested<EntityState>, RetrievalError> {
465 let query = format!(r#"SELECT "id", "state_buffer", "head", "attestations" FROM "{}" WHERE "id" = $1"#, self.state_table());
467
468 let mut client = match self.pool.get().await {
469 Ok(client) => client,
470 Err(err) => {
471 return Err(RetrievalError::StorageError(err.into()));
472 }
473 };
474
475 debug!("PostgresBucket({}).get_state: {}", self.collection_id, query);
476 let rows = match client.query(&query, &[&id]).await {
477 Ok(rows) => rows,
478 Err(err) => {
479 let kind = error_kind(&err);
480 if let ErrorKind::UndefinedTable { table } = kind {
481 if table == self.state_table() {
482 self.create_state_table(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
483 return Err(RetrievalError::EntityNotFound(id));
484 }
485 }
486 return Err(RetrievalError::StorageError(err.into()));
487 }
488 };
489
490 let row = match rows.into_iter().next() {
491 Some(row) => row,
492 None => return Err(RetrievalError::EntityNotFound(id)),
493 };
494
495 debug!("PostgresBucket({}).get_state: Row: {:?}", self.collection_id, row);
496 let row_id: EntityId = row.try_get("id").map_err(RetrievalError::storage)?;
497 assert_eq!(row_id, id);
498
499 let serialized_buffers: Vec<u8> = row.try_get("state_buffer").map_err(RetrievalError::storage)?;
500 let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&serialized_buffers).map_err(RetrievalError::storage)?;
501 let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
502 let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
503 let attestations = attestation_bytes
504 .into_iter()
505 .map(|bytes| bincode::deserialize(&bytes))
506 .collect::<Result<Vec<Attestation>, _>>()
507 .map_err(RetrievalError::storage)?;
508
509 Ok(Attested {
510 payload: EntityState {
511 entity_id: id,
512 collection: self.collection_id.clone(),
513 state: State { state_buffers: StateBuffers(state_buffers), head },
514 },
515 attestations: AttestationSet(attestations),
516 })
517 }
518
519 async fn fetch_states(&self, selection: &ankql::ast::Selection) -> Result<Vec<Attested<EntityState>>, RetrievalError> {
520 debug!("fetch_states: {:?}", selection);
521 let mut client = self.pool.get().await.map_err(|err| RetrievalError::StorageError(Box::new(err)))?;
522
523 let referenced = selection.referenced_columns();
528 let cached = self.existing_columns();
529 let unknown_to_cache: Vec<&String> = referenced.iter().filter(|col| !cached.contains(col)).collect();
530
531 if !unknown_to_cache.is_empty() {
533 debug!("PostgresBucket({}).fetch_states: Unknown columns {:?}, refreshing schema cache", self.collection_id, unknown_to_cache);
534 self.rebuild_columns_cache(&mut client).await.map_err(|e| RetrievalError::StorageError(e.into()))?;
535 }
536
537 let existing = self.existing_columns();
539 let missing: Vec<String> = referenced.into_iter().filter(|col| !existing.contains(col)).collect();
540
541 let effective_selection = if missing.is_empty() {
542 selection.clone()
543 } else {
544 debug!("PostgresBucket({}).fetch_states: Columns {:?} don't exist, treating as NULL", self.collection_id, missing);
545 selection.assume_null(&missing)
546 };
547
548 let split = sql_builder::split_predicate_for_postgres(&effective_selection.predicate);
550 let needs_post_filter = split.needs_post_filter();
551 let remaining_predicate = split.remaining_predicate; debug!(
553 "PostgresBucket({}).fetch_states: SQL predicate: {:?}, remaining: {:?}, needs_post_filter: {}",
554 self.collection_id, split.sql_predicate, remaining_predicate, needs_post_filter
555 );
556
557 #[cfg(debug_assertions)]
559 {
560 let mut spilled = self.last_spilled_predicate.write().unwrap();
561 *spilled = if needs_post_filter { Some(remaining_predicate.clone()) } else { None };
562 }
563
564 #[cfg(debug_assertions)]
566 {
567 let spilled = if needs_post_filter { Some(remaining_predicate.clone()) } else { None };
568 *self.last_spilled_predicate.write().unwrap() = spilled;
569 }
570
571 let sql_selection = ankql::ast::Selection {
573 predicate: split.sql_predicate,
574 order_by: effective_selection.order_by.clone(),
575 limit: if needs_post_filter {
576 None } else {
578 effective_selection.limit
579 },
580 };
581
582 let mut results = Vec::new();
583 let mut builder = SqlBuilder::with_fields(vec!["id", "state_buffer", "head", "attestations"]);
584 builder.table_name(self.state_table());
585 builder.selection(&sql_selection)?;
586
587 let (sql, args) = builder.build()?;
588 debug!("PostgresBucket({}).fetch_states: SQL: {} with args: {:?}", self.collection_id, sql, args);
589
590 let stream = match client.query_raw(&sql, args).await {
591 Ok(stream) => stream,
592 Err(err) => {
593 let kind = error_kind(&err);
594 if let ErrorKind::UndefinedTable { table } = kind {
595 if table == self.state_table() {
596 return Ok(Vec::new());
598 }
599 }
600 return Err(RetrievalError::StorageError(err.into()));
601 }
602 };
603 pin_mut!(stream);
604
605 while let Some(row) = stream.try_next().await.map_err(RetrievalError::storage)? {
606 let id: EntityId = row.try_get(0).map_err(RetrievalError::storage)?;
607 let state_buffer: Vec<u8> = row.try_get(1).map_err(RetrievalError::storage)?;
608 let state_buffers: BTreeMap<String, Vec<u8>> = bincode::deserialize(&state_buffer).map_err(RetrievalError::storage)?;
609 let head: Clock = row.try_get("head").map_err(RetrievalError::storage)?;
610 let attestation_bytes: Vec<Vec<u8>> = row.try_get("attestations").map_err(RetrievalError::storage)?;
611 let attestations = attestation_bytes
612 .into_iter()
613 .map(|bytes| bincode::deserialize(&bytes))
614 .collect::<Result<Vec<Attestation>, _>>()
615 .map_err(RetrievalError::storage)?;
616
617 results.push(Attested {
618 payload: EntityState {
619 entity_id: id,
620 collection: self.collection_id.clone(),
621 state: State { state_buffers: StateBuffers(state_buffers), head },
622 },
623 attestations: AttestationSet(attestations),
624 });
625 }
626
627 let results = if needs_post_filter {
629 debug!(
630 "PostgresBucket({}).fetch_states: Post-filtering {} results with remaining predicate",
631 self.collection_id,
632 results.len()
633 );
634 let filtered = post_filter_states(&results, &remaining_predicate, &self.collection_id);
635
636 if let Some(limit) = effective_selection.limit {
638 filtered.into_iter().take(limit as usize).collect()
639 } else {
640 filtered
641 }
642 } else {
643 results
644 };
645
646 Ok(results)
647 }
648
649 async fn add_event(&self, entity_event: &Attested<Event>) -> Result<bool, MutationError> {
650 let operations = bincode::serialize(&entity_event.payload.operations)?;
651 let attestations = bincode::serialize(&entity_event.attestations)?;
652
653 let query = format!(
654 r#"INSERT INTO "{0}"("id", "entity_id", "operations", "parent", "attestations") VALUES($1, $2, $3, $4, $5)
655 ON CONFLICT ("id") DO NOTHING"#,
656 self.event_table(),
657 );
658
659 let mut client = self.pool.get().await.map_err(|err| MutationError::General(err.into()))?;
660 debug!("PostgresBucket({}).add_event: {}", self.collection_id, query);
661 let mut created_table = false;
662 let affected = loop {
663 match client
664 .execute(
665 &query,
666 &[
667 &entity_event.payload.id(),
668 &entity_event.payload.entity_id,
669 &operations,
670 &entity_event.payload.parent,
671 &attestations,
672 ],
673 )
674 .await
675 {
676 Ok(affected) => break affected,
677 Err(err) => {
678 let kind = error_kind(&err);
679 if let ErrorKind::UndefinedTable { table } = kind {
680 if table == self.event_table() && !created_table {
681 self.create_event_table(&mut client).await?;
682 created_table = true;
683 continue; }
685 }
686 error!("PostgresBucket({}).add_event: Error: {:?}", self.collection_id, err);
687 return Err(StateError::DMLError(Box::new(err)).into());
688 }
689 }
690 };
691
692 Ok(affected > 0)
693 }
694
695 async fn get_events(&self, event_ids: Vec<EventId>) -> Result<Vec<Attested<Event>>, RetrievalError> {
696 if event_ids.is_empty() {
697 return Ok(Vec::new());
698 }
699
700 let query = format!(
701 r#"SELECT "id", "entity_id", "operations", "parent", "attestations" FROM "{0}" WHERE "id" = ANY($1)"#,
702 self.event_table(),
703 );
704
705 let client = self.pool.get().await.map_err(RetrievalError::storage)?;
706 let rows = match client.query(&query, &[&event_ids]).await {
707 Ok(rows) => rows,
708 Err(err) => {
709 let kind = error_kind(&err);
710 match kind {
711 ErrorKind::UndefinedTable { table } if table == self.event_table() => return Ok(Vec::new()),
712 _ => return Err(RetrievalError::storage(err)),
713 }
714 }
715 };
716
717 let mut events = Vec::new();
718 for row in rows {
719 let entity_id: EntityId = row.try_get("entity_id").map_err(RetrievalError::storage)?;
720 let operations: OperationSet = row.try_get("operations").map_err(RetrievalError::storage)?;
721 let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
722 let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
723 let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
724
725 let event = Attested {
726 payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
727 attestations: AttestationSet(attestations),
728 };
729 events.push(event);
730 }
731 Ok(events)
732 }
733
734 async fn dump_entity_events(&self, entity_id: EntityId) -> Result<Vec<Attested<Event>>, ankurah_core::error::RetrievalError> {
735 let query =
736 format!(r#"SELECT "id", "operations", "parent", "attestations" FROM "{0}" WHERE "entity_id" = $1"#, self.event_table(),);
737
738 let client = self.pool.get().await.map_err(RetrievalError::storage)?;
739 debug!("PostgresBucket({}).get_events: {}", self.collection_id, query);
740 let rows = match client.query(&query, &[&entity_id]).await {
741 Ok(rows) => rows,
742 Err(err) => {
743 let kind = error_kind(&err);
744 if let ErrorKind::UndefinedTable { table } = kind {
745 if table == self.event_table() {
746 return Ok(Vec::new());
747 }
748 }
749
750 return Err(RetrievalError::storage(err));
751 }
752 };
753
754 let mut events = Vec::new();
755 for row in rows {
756 let operations_binary: Vec<u8> = row.try_get("operations").map_err(RetrievalError::storage)?;
758 let operations = bincode::deserialize(&operations_binary).map_err(RetrievalError::storage)?;
759 let parent: Clock = row.try_get("parent").map_err(RetrievalError::storage)?;
760 let attestations_binary: Vec<u8> = row.try_get("attestations").map_err(RetrievalError::storage)?;
761 let attestations: Vec<Attestation> = bincode::deserialize(&attestations_binary).map_err(RetrievalError::storage)?;
762
763 events.push(Attested {
764 payload: Event { collection: self.collection_id.clone(), entity_id, operations, parent },
765 attestations: AttestationSet(attestations),
766 });
767 }
768
769 Ok(events)
770 }
771}
772
773#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
777pub enum ErrorKind {
778 RowCount,
779 UndefinedTable { table: String },
780 UndefinedColumn { table: Option<String>, column: String },
781 Unknown,
782 PostgresError(String),
783}
784
785pub fn error_kind(err: &tokio_postgres::Error) -> ErrorKind {
786 let string = err.as_db_error().map(|e| e.message()).unwrap_or_default().trim().to_owned();
787 let _db_error = err.as_db_error();
788 let sql_code = err.code().cloned();
789
790 let err_string = err.to_string();
792 if err_string.contains("query returned an unexpected number of rows") || string == "query returned an unexpected number of rows" {
793 return ErrorKind::RowCount;
794 }
795
796 debug!("postgres error: {:?}", err);
803
804 let quote_indices = |s: &str| {
805 let mut quotes = Vec::new();
806 for (index, char) in s.char_indices() {
807 if char == '"' {
808 quotes.push(index)
809 }
810 }
811 quotes
812 };
813
814 match sql_code {
815 Some(SqlState::UNDEFINED_TABLE) => {
816 let quotes = quote_indices(&string);
818 if quotes.len() >= 2 {
819 let table = &string[quotes[0] + 1..quotes[1]];
820 ErrorKind::UndefinedTable { table: table.to_owned() }
821 } else {
822 ErrorKind::PostgresError(string.clone())
823 }
824 }
825 Some(SqlState::UNDEFINED_COLUMN) => {
826 let quotes = quote_indices(&string);
830 if quotes.len() >= 2 {
831 let column = string[quotes[0] + 1..quotes[1]].to_owned();
832
833 let table = if quotes.len() >= 4 {
834 Some(string[quotes[2] + 1..quotes[3]].to_owned())
836 } else {
837 None
839 };
840
841 ErrorKind::UndefinedColumn { table, column }
842 } else {
843 ErrorKind::PostgresError(string.clone())
844 }
845 }
846 _ => ErrorKind::Unknown,
847 }
848}
849
850#[allow(unused)]
851pub struct MissingMaterialized {
852 pub name: String,
853}
854
855use bytes::BytesMut;
856use tokio_postgres::types::{to_sql_checked, IsNull, Type};
857
858use crate::sql_builder::SqlBuilder;
859
860fn post_filter_states(
865 states: &[Attested<EntityState>],
866 predicate: &ankql::ast::Predicate,
867 collection_id: &CollectionId,
868) -> Vec<Attested<EntityState>> {
869 use ankurah_core::entity::TemporaryEntity;
870 use ankurah_core::selection::filter::evaluate_predicate;
871
872 states
873 .iter()
874 .filter(|attested| {
875 match TemporaryEntity::new(attested.payload.entity_id, collection_id.clone(), &attested.payload.state) {
877 Ok(temp_entity) => {
878 match evaluate_predicate(&temp_entity, predicate) {
880 Ok(true) => true,
881 Ok(false) => false,
882 Err(e) => {
883 warn!("Post-filter evaluation error for entity {}: {}", attested.payload.entity_id, e);
884 false }
886 }
887 }
888 Err(e) => {
889 warn!("Failed to create TemporaryEntity for post-filtering {}: {}", attested.payload.entity_id, e);
890 false }
892 }
893 })
894 .cloned()
895 .collect()
896}
897
898#[derive(Debug)]
899struct UntypedNull;
900
901impl ToSql for UntypedNull {
902 fn to_sql(&self, _ty: &Type, _out: &mut BytesMut) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> { Ok(IsNull::Yes) }
903
904 fn accepts(_ty: &Type) -> bool {
905 true }
907
908 to_sql_checked!();
909}