Skip to main content

cratestack_sqlx/query/batch/
get.rs

1//! `batch_get` — single `SELECT ... WHERE pk IN (...) AND policy(...)`.
2//! Walk-and-match the returned rows back to input positions; absent
3//! PKs surface as `NotFound`.
4
5use std::collections::HashMap;
6use std::hash::Hash;
7
8use cratestack_core::{BatchResponse, CoolContext, CoolError};
9
10use crate::query::support::push_action_policy_query;
11use crate::{ModelDescriptor, ModelPrimaryKey, SqlxRuntime, sqlx};
12
13use super::validate::{reject_duplicate_pks, validate_batch_size};
14
15#[derive(Debug, Clone)]
16pub struct BatchGet<'a, M: 'static, PK: 'static> {
17    pub(crate) runtime: &'a SqlxRuntime,
18    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
19    pub(crate) ids: Vec<PK>,
20}
21
22impl<'a, M: 'static, PK: 'static> BatchGet<'a, M, PK> {
23    pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
24    where
25        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + ModelPrimaryKey<PK>,
26        PK: Clone
27            + Eq
28            + Hash
29            + Send
30            + sqlx::Type<sqlx::Postgres>
31            + for<'q> sqlx::Encode<'q, sqlx::Postgres>,
32    {
33        validate_batch_size(self.ids.len())?;
34        reject_duplicate_pks(&self.ids)?;
35        if self.ids.is_empty() {
36            return Ok(BatchResponse::from_results(vec![]));
37        }
38
39        // Single SELECT with IN-list + read policy + soft-delete filter.
40        let mut query = sqlx::QueryBuilder::<sqlx::Postgres>::new("SELECT ");
41        query.push(self.descriptor.select_projection());
42        query.push(" FROM ").push(self.descriptor.table_name);
43        query.push(" WHERE ");
44        if let Some(col) = self.descriptor.soft_delete_column {
45            query.push(col).push(" IS NULL AND ");
46        }
47        query.push(self.descriptor.primary_key).push(" IN (");
48        for (index, id) in self.ids.iter().enumerate() {
49            if index > 0 {
50                query.push(", ");
51            }
52            query.push_bind(id.clone());
53        }
54        query.push(") AND ");
55        push_action_policy_query(
56            &mut query,
57            self.descriptor.read_allow_policies,
58            self.descriptor.read_deny_policies,
59            ctx,
60        );
61
62        let rows: Vec<M> = query
63            .build_query_as::<M>()
64            .fetch_all(self.runtime.pool())
65            .await
66            .map_err(|error| CoolError::Database(error.to_string()))?;
67
68        // Walk-and-match: pair each input PK back to its row, or
69        // NotFound when the read policy / soft-delete excluded it.
70        let mut by_pk: HashMap<PK, M> = rows.into_iter().map(|m| (m.primary_key(), m)).collect();
71        let per_item: Vec<Result<M, CoolError>> = self
72            .ids
73            .into_iter()
74            .map(|id| {
75                by_pk
76                    .remove(&id)
77                    .ok_or_else(|| CoolError::NotFound("no row matched".to_owned()))
78            })
79            .collect();
80
81        Ok(BatchResponse::from_results(per_item))
82    }
83}