use chrono::{DateTime, Utc};
use es_entity::*;
use sqlx::PgPool;
use tracing::instrument;
use std::collections::HashMap;
use crate::{
outbox::OutboxPublisher,
primitives::{AccountId, JournalId},
};
use super::{entity::*, error::*};
const ADDVISORY_LOCK_ID: i64 = 123456;
pub mod members_cursor {
use cala_types::account_set::{
AccountSetMember, AccountSetMemberByExternalId, AccountSetMemberId,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountSetMemberByCreatedAtCursor {
pub id: AccountSetMemberId,
pub member_created_at: chrono::DateTime<chrono::Utc>,
}
impl From<&AccountSetMember> for AccountSetMemberByCreatedAtCursor {
fn from(member: &AccountSetMember) -> Self {
Self {
id: member.id,
member_created_at: member.created_at,
}
}
}
#[cfg(feature = "graphql")]
impl async_graphql::connection::CursorType for AccountSetMemberByCreatedAtCursor {
type Error = String;
fn encode_cursor(&self) -> String {
use base64::{engine::general_purpose, Engine as _};
let json = serde_json::to_string(&self).expect("could not serialize token");
general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
}
fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
use base64::{engine::general_purpose, Engine as _};
let bytes = general_purpose::STANDARD_NO_PAD
.decode(s.as_bytes())
.map_err(|e| e.to_string())?;
let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AccountSetMemberByExternalIdCursor {
pub id: AccountSetMemberId,
pub external_id: Option<String>,
}
impl From<&AccountSetMemberByExternalId> for AccountSetMemberByExternalIdCursor {
fn from(member: &AccountSetMemberByExternalId) -> Self {
Self {
id: member.id,
external_id: member.external_id.clone(),
}
}
}
#[cfg(feature = "graphql")]
impl async_graphql::connection::CursorType for AccountSetMemberByExternalIdCursor {
type Error = String;
fn encode_cursor(&self) -> String {
use base64::{engine::general_purpose, Engine as _};
let json = serde_json::to_string(&self).expect("could not serialize token");
general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
}
fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
use base64::{engine::general_purpose, Engine as _};
let bytes = general_purpose::STANDARD_NO_PAD
.decode(s.as_bytes())
.map_err(|e| e.to_string())?;
let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
serde_json::from_str(&json).map_err(|e| e.to_string())
}
}
}
use account_set_cursor::*;
use members_cursor::*;
#[derive(EsRepo, Debug, Clone)]
#[es_repo(
entity = "AccountSet",
columns(
name(
ty = "String",
update(accessor = "values().name"),
list_by,
list_for(by(created_at))
),
journal_id(ty = "JournalId", update(persist = false)),
external_id(
ty = "Option<String>",
update(accessor = "values().external_id"),
list_by
),
),
tbl_prefix = "cala",
post_persist_hook = "publish",
persist_event_context = false
)]
pub(super) struct AccountSetRepo {
pool: PgPool,
publisher: OutboxPublisher,
}
impl AccountSetRepo {
pub fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
Self {
pool: pool.clone(),
publisher: publisher.clone(),
}
}
pub async fn list_children_by_created_at(
&self,
id: AccountSetId,
args: es_entity::PaginatedQueryArgs<AccountSetMemberByCreatedAtCursor>,
) -> Result<
es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMemberByCreatedAtCursor>,
AccountSetError,
> {
self.list_children_by_created_at_in_op(&self.pool, id, args)
.await
}
#[instrument(
name = "account_set.list_children_by_created_at_in_op",
skip_all,
err(level = "warn")
)]
pub async fn list_children_by_created_at_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
account_set_id: AccountSetId,
args: es_entity::PaginatedQueryArgs<AccountSetMemberByCreatedAtCursor>,
) -> Result<
es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMemberByCreatedAtCursor>,
AccountSetError,
> {
let es_entity::PaginatedQueryArgs { first, after } = args;
let (member_id, created_at) = if let Some(after) = after {
(Some(after.id), Some(after.member_created_at))
} else {
(None, None)
};
let id = match member_id {
Some(member_id) => match member_id {
AccountSetMemberId::Account(id) => Some(id),
AccountSetMemberId::AccountSet(id) => Some(id.into()),
},
None => None,
};
let rows = op
.into_executor()
.fetch_all(sqlx::query!(
r#"
WITH member_accounts AS (
SELECT
member_account_id AS member_id,
member_account_id,
NULL::uuid AS member_account_set_id,
created_at
FROM cala_account_set_member_accounts
WHERE
transitive IS FALSE
AND account_set_id = $4
AND (COALESCE((created_at, member_account_id) < ($3, $2), $2 IS NULL))
ORDER BY created_at DESC, member_account_id DESC
LIMIT $1
), member_sets AS (
SELECT
member_account_set_id AS member_id,
NULL::uuid AS member_account_id,
member_account_set_id,
created_at
FROM cala_account_set_member_account_sets
WHERE
account_set_id = $4
AND (COALESCE((created_at, member_account_set_id) < ($3, $2), $2 IS NULL))
ORDER BY created_at DESC, member_account_set_id DESC
LIMIT $1
), all_members AS (
SELECT * FROM member_accounts
UNION ALL
SELECT * FROM member_sets
)
SELECT * FROM all_members
ORDER BY created_at DESC, member_id DESC
LIMIT $1
"#,
(first + 1) as i64,
id.map(uuid::Uuid::from),
created_at,
uuid::Uuid::from(account_set_id),
))
.await?;
let has_next_page = rows.len() > first;
let mut end_cursor = None;
if let Some(last) = rows.last() {
let id = last
.member_account_id
.map(|account_id| AccountSetMemberId::Account(account_id.into()))
.or_else(|| {
last.member_account_set_id
.map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
});
end_cursor = Some(AccountSetMemberByCreatedAtCursor {
id: id.expect("member_id not set"),
member_created_at: last.created_at.expect("created_at not set"),
});
}
let account_set_members = rows
.into_iter()
.take(first)
.map(
|row| match (row.member_account_id, row.member_account_set_id) {
(Some(member_account_id), _) => AccountSetMember::from((
AccountSetMemberId::Account(AccountId::from(member_account_id)),
row.created_at.expect("created at should always be present"),
)),
(_, Some(member_account_set_id)) => AccountSetMember::from((
AccountSetMemberId::AccountSet(AccountSetId::from(member_account_set_id)),
row.created_at.expect("created at should always be present"),
)),
_ => unreachable!(),
},
)
.collect::<Vec<AccountSetMember>>();
Ok(es_entity::PaginatedQueryRet {
entities: account_set_members,
has_next_page,
end_cursor,
})
}
pub async fn list_children_by_external_id(
&self,
id: AccountSetId,
args: es_entity::PaginatedQueryArgs<AccountSetMemberByExternalIdCursor>,
) -> Result<
es_entity::PaginatedQueryRet<
AccountSetMemberByExternalId,
AccountSetMemberByExternalIdCursor,
>,
AccountSetError,
> {
self.list_children_by_external_id_in_op(&self.pool, id, args)
.await
}
pub async fn list_children_by_external_id_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
account_set_id: AccountSetId,
args: es_entity::PaginatedQueryArgs<AccountSetMemberByExternalIdCursor>,
) -> Result<
es_entity::PaginatedQueryRet<
AccountSetMemberByExternalId,
AccountSetMemberByExternalIdCursor,
>,
AccountSetError,
> {
let es_entity::PaginatedQueryArgs { first, after } = args;
let (member_id, external_id) = if let Some(after) = after {
(Some(after.id), after.external_id)
} else {
(None, None)
};
let id = match member_id {
Some(member_id) => match member_id {
AccountSetMemberId::Account(id) => Some(id),
AccountSetMemberId::AccountSet(id) => Some(id.into()),
},
None => None,
};
let rows = op
.into_executor()
.fetch_all(sqlx::query!(
r#"
WITH member_accounts AS (
SELECT
member_account_id AS member_id,
member_account_id,
NULL::uuid AS member_account_set_id,
a.external_id
FROM cala_account_set_member_accounts m
LEFT JOIN cala_accounts a ON m.member_account_id = a.id
WHERE
transitive IS FALSE
AND m.account_set_id = $4
AND (
($3::varchar IS NULL) OR
(a.external_id IS NULL AND $3::varchar IS NOT NULL) OR
(a.external_id > $3::varchar) OR
(a.external_id = $3::varchar AND member_account_id > $2)
)
ORDER BY a.external_id ASC NULLS LAST, member_account_id ASC
LIMIT $1
), member_sets AS (
SELECT
member_account_set_id AS member_id,
NULL::uuid AS member_account_id,
member_account_set_id,
s.external_id
FROM cala_account_set_member_account_sets m
LEFT JOIN cala_account_sets s ON m.member_account_set_id = s.id
WHERE
m.account_set_id = $4
AND (
($3::varchar IS NULL) OR
(s.external_id IS NULL AND $3::varchar IS NOT NULL) OR
(s.external_id > $3::varchar) OR
(s.external_id = $3::varchar AND member_account_set_id > $2)
)
ORDER BY s.external_id ASC NULLS LAST, member_account_set_id ASC
LIMIT $1
), all_members AS (
SELECT * FROM member_accounts
UNION ALL
SELECT * FROM member_sets
)
SELECT * FROM all_members
ORDER BY external_id ASC NULLS LAST, member_id ASC
LIMIT $1
"#,
(first + 1) as i64,
id.map(uuid::Uuid::from),
external_id,
uuid::Uuid::from(account_set_id),
))
.await?;
let has_next_page = rows.len() > first;
let mut end_cursor = None;
if let Some(last) = rows.last() {
let id = last
.member_account_id
.map(|account_id| AccountSetMemberId::Account(account_id.into()))
.or_else(|| {
last.member_account_set_id
.map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
});
end_cursor = Some(AccountSetMemberByExternalIdCursor {
id: id.expect("member_id not set"),
external_id: last.external_id.clone(),
});
}
let account_set_members = rows
.into_iter()
.take(first)
.map(
|row| match (row.member_account_id, row.member_account_set_id) {
(Some(member_account_id), _) => AccountSetMemberByExternalId {
id: AccountSetMemberId::Account(AccountId::from(member_account_id)),
external_id: row.external_id,
},
(_, Some(member_account_set_id)) => AccountSetMemberByExternalId {
id: AccountSetMemberId::AccountSet(AccountSetId::from(
member_account_set_id,
)),
external_id: row.external_id,
},
_ => unreachable!(),
},
)
.collect::<Vec<AccountSetMemberByExternalId>>();
Ok(es_entity::PaginatedQueryRet {
entities: account_set_members,
has_next_page,
end_cursor,
})
}
#[instrument(
name = "account_set.add_member_account_and_return_parents",
skip_all,
err(level = "warn")
)]
pub async fn add_member_account_and_return_parents(
&self,
db: &mut impl es_entity::AtomicOperation,
account_set_id: AccountSetId,
account_id: AccountId,
) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
.execute(db.as_executor())
.await?;
let rows = sqlx::query!(r#"
WITH RECURSIVE parents AS (
SELECT m.member_account_set_id, m.account_set_id
FROM cala_account_set_member_account_sets m
JOIN cala_account_sets s
ON s.id = m.account_set_id
WHERE m.member_account_set_id = $1
UNION ALL
SELECT p.member_account_set_id, m.account_set_id
FROM parents p
JOIN cala_account_set_member_account_sets m
ON p.account_set_id = m.member_account_set_id
),
non_transitive_insert AS (
INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id)
VALUES ($1, $2)
),
transitive_insert AS (
INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
SELECT p.account_set_id, $2, TRUE
FROM parents p
)
SELECT account_set_id, NULL AS now
FROM parents
UNION ALL
SELECT NULL AS account_set_id, NOW() AS now
"#,
account_set_id as AccountSetId,
account_id as AccountId,
)
.fetch_all(db.as_executor())
.await?;
let mut time = None;
let ret = rows
.into_iter()
.filter_map(|row| {
if let Some(t) = row.now {
time = Some(t);
None
} else {
Some(AccountSetId::from(
row.account_set_id.expect("account_set_id not set"),
))
}
})
.collect();
self.publisher
.publish_all(
db,
std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
account_set_id,
member_id: crate::account_set::AccountSetMemberId::Account(account_id),
}),
)
.await?;
Ok((time.expect("time not set"), ret))
}
#[instrument(
name = "account_set.remove_member_account_and_return_parents",
skip_all,
err(level = "warn")
)]
pub async fn remove_member_account_and_return_parents(
&self,
db: &mut impl es_entity::AtomicOperation,
account_set_id: AccountSetId,
account_id: AccountId,
) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
.execute(db.as_executor())
.await?;
let rows = sqlx::query!(
r#"
WITH RECURSIVE parents AS (
SELECT m.member_account_set_id, m.account_set_id
FROM cala_account_set_member_account_sets m
JOIN cala_account_sets s
ON s.id = m.account_set_id
WHERE m.member_account_set_id = $1
UNION ALL
SELECT p.member_account_set_id, m.account_set_id
FROM parents p
JOIN cala_account_set_member_account_sets m
ON p.account_set_id = m.member_account_set_id
),
deletions as (
DELETE FROM cala_account_set_member_accounts
WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
AND member_account_id = $2
)
SELECT account_set_id, NULL AS now
FROM parents
UNION ALL
SELECT NULL AS account_set_id, NOW() AS now
"#,
account_set_id as AccountSetId,
account_id as AccountId,
)
.fetch_all(db.as_executor())
.await?;
let mut time = None;
let ret = rows
.into_iter()
.filter_map(|row| {
if let Some(t) = row.now {
time = Some(t);
None
} else {
Some(AccountSetId::from(
row.account_set_id.expect("account_set_id not set"),
))
}
})
.collect();
self.publisher
.publish_all(
db,
std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
account_set_id,
member_id: crate::account_set::AccountSetMemberId::Account(account_id),
}),
)
.await?;
Ok((time.expect("time not set"), ret))
}
#[instrument(
name = "account_set.add_member_set_and_return_parents",
skip_all,
err(level = "warn")
)]
pub async fn add_member_set_and_return_parents(
&self,
db: &mut impl es_entity::AtomicOperation,
account_set_id: AccountSetId,
member_account_set_id: AccountSetId,
) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
.execute(db.as_executor())
.await?;
let rows = sqlx::query!(r#"
WITH RECURSIVE parents AS (
SELECT m.member_account_set_id, m.account_set_id
FROM cala_account_set_member_account_sets m
JOIN cala_account_sets s
ON s.id = m.account_set_id
WHERE m.member_account_set_id = $1
UNION ALL
SELECT p.member_account_set_id, m.account_set_id
FROM parents p
JOIN cala_account_set_member_account_sets m
ON p.account_set_id = m.member_account_set_id
),
set_insert AS (
INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id)
VALUES ($1, $2)
),
new_members AS (
INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
SELECT $1, m.member_account_id, TRUE
FROM cala_account_set_member_accounts m
WHERE m.account_set_id = $2
RETURNING member_account_id
),
transitive_inserts AS (
INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
SELECT p.account_set_id, n.member_account_id, TRUE
FROM parents p
CROSS JOIN new_members n
)
SELECT account_set_id, NULL AS now
FROM parents
UNION ALL
SELECT NULL AS account_set_id, NOW() AS now
"#,
account_set_id as AccountSetId,
member_account_set_id as AccountSetId,
)
.fetch_all(db.as_executor())
.await?;
let mut time = None;
let ret = rows
.into_iter()
.filter_map(|row| {
if let Some(t) = row.now {
time = Some(t);
None
} else {
Some(AccountSetId::from(
row.account_set_id.expect("account_set_id not set"),
))
}
})
.collect();
self.publisher
.publish_all(
db,
std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
account_set_id,
member_id: crate::account_set::AccountSetMemberId::AccountSet(
member_account_set_id,
),
}),
)
.await?;
Ok((time.expect("time not set"), ret))
}
#[instrument(
name = "account_set.remove_member_set_and_return_parents",
skip_all,
err(level = "warn")
)]
pub async fn remove_member_set_and_return_parents(
&self,
db: &mut impl es_entity::AtomicOperation,
account_set_id: AccountSetId,
member_account_set_id: AccountSetId,
) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
.execute(db.as_executor())
.await?;
let rows = sqlx::query!(
r#"
WITH RECURSIVE parents AS (
SELECT m.member_account_set_id, m.account_set_id
FROM cala_account_set_member_account_sets m
JOIN cala_account_sets s
ON s.id = m.account_set_id
WHERE m.member_account_set_id = $1
UNION ALL
SELECT p.member_account_set_id, m.account_set_id
FROM parents p
JOIN cala_account_set_member_account_sets m
ON p.account_set_id = m.member_account_set_id
),
member_accounts_deletion AS (
DELETE FROM cala_account_set_member_accounts
WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
AND member_account_id IN (SELECT member_account_id FROM cala_account_set_member_accounts
WHERE account_set_id = $2)
),
member_account_set_deletion AS (
DELETE FROM cala_account_set_member_account_sets
WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
AND member_account_set_id = $2
)
SELECT account_set_id, NULL AS now
FROM parents
UNION ALL
SELECT NULL AS account_set_id, NOW() AS now
"#,
account_set_id as AccountSetId,
member_account_set_id as AccountSetId,
)
.fetch_all(db.as_executor())
.await?;
let mut time = None;
let ret = rows
.into_iter()
.filter_map(|row| {
if let Some(t) = row.now {
time = Some(t);
None
} else {
Some(AccountSetId::from(
row.account_set_id.expect("account_set_id not set"),
))
}
})
.collect();
self.publisher
.publish_all(
db,
std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
account_set_id,
member_id: crate::account_set::AccountSetMemberId::AccountSet(
member_account_set_id,
),
}),
)
.await?;
Ok((time.expect("time not set"), ret))
}
pub async fn find_where_account_is_member(
&self,
account_id: AccountId,
query: es_entity::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError>
{
self.find_where_account_is_member_in_op(&self.pool, account_id, query)
.await
}
pub async fn find_where_account_is_member_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
account_id: AccountId,
query: es_entity::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError>
{
let (entities, has_next_page) = es_entity::es_query!(
tbl_prefix = "cala",
r#"SELECT a.id, a.name, a.created_at
FROM cala_account_sets a
JOIN cala_account_set_member_accounts asm
ON asm.account_set_id = a.id
WHERE asm.member_account_id = $1 AND transitive IS FALSE
AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
ORDER BY a.name, a.id
LIMIT $4"#,
account_id as AccountId,
query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_n(op, query.first)
.await?;
let mut end_cursor = None;
if let Some(last) = entities.last() {
end_cursor = Some(AccountSetByNameCursor {
id: last.values().id,
name: last.values().name.clone(),
});
}
Ok(es_entity::PaginatedQueryRet {
entities,
has_next_page,
end_cursor,
})
}
pub async fn find_where_account_set_is_member(
&self,
account_set_id: AccountSetId,
query: es_entity::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError>
{
self.find_where_account_set_is_member_in_op(&self.pool, account_set_id, query)
.await
}
pub async fn find_where_account_set_is_member_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
account_set_id: AccountSetId,
query: es_entity::PaginatedQueryArgs<AccountSetByNameCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetByNameCursor>, AccountSetError>
{
let (entities, has_next_page) = es_entity::es_query!(
tbl_prefix = "cala",
r#"SELECT a.id, a.name, a.created_at
FROM cala_account_sets a
JOIN cala_account_set_member_account_sets asm
ON asm.account_set_id = a.id
WHERE asm.member_account_set_id = $1
AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
ORDER BY a.name, a.id
LIMIT $4"#,
account_set_id as AccountSetId,
query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
query.after.map(|c| c.name),
query.first as i64 + 1
)
.fetch_n(op, query.first)
.await?;
let mut end_cursor = None;
if let Some(last) = entities.last() {
end_cursor = Some(AccountSetByNameCursor {
id: last.values().id,
name: last.values().name.clone(),
});
}
Ok(es_entity::PaginatedQueryRet {
entities,
has_next_page,
end_cursor,
})
}
#[instrument(
name = "account_set.fetch_mappings_in_op",
skip_all,
err(level = "warn")
)]
pub async fn fetch_mappings_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
journal_id: JournalId,
account_ids: &[AccountId],
) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
let rows = op.into_executor().fetch_all(sqlx::query!(
r#"
SELECT m.account_set_id AS "set_id!: AccountSetId", m.member_account_id AS "account_id!: AccountId"
FROM cala_account_set_member_accounts m
JOIN cala_account_sets s
ON m.account_set_id = s.id AND s.journal_id = $1
WHERE m.member_account_id = ANY($2)
"#,
journal_id as JournalId,
account_ids as &[AccountId]
))
.await?;
let mut mappings = HashMap::new();
for row in rows {
mappings
.entry(row.account_id)
.or_insert_with(Vec::new)
.push(row.set_id);
}
Ok(mappings)
}
pub async fn list_eventually_consistent_ids(
&self,
args: es_entity::PaginatedQueryArgs<AccountSetByIdCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSetId, AccountSetByIdCursor>, AccountSetError>
{
self.list_eventually_consistent_ids_in_op(&self.pool, args)
.await
}
#[instrument(
name = "account_set.list_eventually_consistent_ids_in_op",
skip_all,
err(level = "warn")
)]
pub async fn list_eventually_consistent_ids_in_op(
&self,
op: impl es_entity::IntoOneTimeExecutor<'_>,
args: es_entity::PaginatedQueryArgs<AccountSetByIdCursor>,
) -> Result<es_entity::PaginatedQueryRet<AccountSetId, AccountSetByIdCursor>, AccountSetError>
{
let es_entity::PaginatedQueryArgs { first, after } = args;
let rows = op
.into_executor()
.fetch_all(sqlx::query!(
r#"
SELECT s.id AS "id!: AccountSetId"
FROM cala_account_sets s
JOIN cala_accounts a ON s.id = a.id
WHERE a.eventually_consistent = TRUE
AND ($2::uuid IS NULL OR s.id > $2)
ORDER BY s.id ASC
LIMIT $1
"#,
(first + 1) as i64,
after.map(|c| uuid::Uuid::from(c.id)),
))
.await?;
let has_next_page = rows.len() > first;
let entities: Vec<AccountSetId> = rows.into_iter().take(first).map(|r| r.id).collect();
let end_cursor = entities.last().map(|id| AccountSetByIdCursor { id: *id });
Ok(es_entity::PaginatedQueryRet {
entities,
has_next_page,
end_cursor,
})
}
#[instrument(
name = "account_set.find_all_ec_descendant_set_ids",
skip_all,
err(level = "warn")
)]
pub async fn find_all_ec_descendant_set_ids(
&self,
op: &mut impl es_entity::AtomicOperation,
account_set_ids: &[AccountSetId],
) -> Result<Vec<AccountSetId>, AccountSetError> {
let rows = sqlx::query!(
r#"
WITH RECURSIVE descendants AS (
SELECT member_account_set_id AS id
FROM cala_account_set_member_account_sets
WHERE account_set_id = ANY($1)
UNION
SELECT m.member_account_set_id
FROM cala_account_set_member_account_sets m
JOIN descendants d ON d.id = m.account_set_id
)
SELECT d.id AS "id!: AccountSetId"
FROM descendants d
JOIN cala_accounts a ON a.id = d.id
WHERE a.eventually_consistent = TRUE
"#,
account_set_ids as &[AccountSetId],
)
.fetch_all(op.as_executor())
.await?;
Ok(rows.into_iter().map(|r| r.id).collect())
}
async fn publish(
&self,
op: &mut impl es_entity::AtomicOperation,
entity: &AccountSet,
new_events: es_entity::LastPersisted<'_, AccountSetEvent>,
) -> Result<(), sqlx::Error> {
self.publisher
.publish_entity_events(op, entity, new_events)
.await?;
Ok(())
}
}