Skip to main content

cala_ledger/account_set/
repo.rs

1use chrono::{DateTime, Utc};
2use es_entity::*;
3use sqlx::PgPool;
4use tracing::instrument;
5
6use std::collections::HashMap;
7
8use crate::{
9    outbox::OutboxPublisher,
10    primitives::{AccountId, JournalId},
11};
12
13use super::{entity::*, error::*};
14
15const ADDVISORY_LOCK_ID: i64 = 123456;
16
17pub mod members_cursor {
18    use cala_types::account_set::{
19        AccountSetMember, AccountSetMemberByExternalId, AccountSetMemberId,
20    };
21    use serde::{Deserialize, Serialize};
22
23    #[derive(Debug, Serialize, Deserialize)]
24    pub struct AccountSetMembersByCreatedAtCursor {
25        pub id: AccountSetMemberId,
26        pub member_created_at: chrono::DateTime<chrono::Utc>,
27    }
28
29    impl From<&AccountSetMember> for AccountSetMembersByCreatedAtCursor {
30        fn from(member: &AccountSetMember) -> Self {
31            Self {
32                id: member.id,
33                member_created_at: member.created_at,
34            }
35        }
36    }
37
38    #[cfg(feature = "graphql")]
39    impl async_graphql::connection::CursorType for AccountSetMembersByCreatedAtCursor {
40        type Error = String;
41
42        fn encode_cursor(&self) -> String {
43            use base64::{engine::general_purpose, Engine as _};
44            let json = serde_json::to_string(&self).expect("could not serialize token");
45            general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
46        }
47
48        fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
49            use base64::{engine::general_purpose, Engine as _};
50            let bytes = general_purpose::STANDARD_NO_PAD
51                .decode(s.as_bytes())
52                .map_err(|e| e.to_string())?;
53            let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
54            serde_json::from_str(&json).map_err(|e| e.to_string())
55        }
56    }
57
58    #[derive(Debug, Serialize, Deserialize)]
59    pub struct AccountSetMembersByExternalIdCursor {
60        pub id: AccountSetMemberId,
61        pub external_id: Option<String>,
62    }
63
64    impl From<&AccountSetMemberByExternalId> for AccountSetMembersByExternalIdCursor {
65        fn from(member: &AccountSetMemberByExternalId) -> Self {
66            Self {
67                id: member.id,
68                external_id: member.external_id.clone(),
69            }
70        }
71    }
72
73    #[cfg(feature = "graphql")]
74    impl async_graphql::connection::CursorType for AccountSetMembersByExternalIdCursor {
75        type Error = String;
76
77        fn encode_cursor(&self) -> String {
78            use base64::{engine::general_purpose, Engine as _};
79            let json = serde_json::to_string(&self).expect("could not serialize token");
80            general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
81        }
82
83        fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
84            use base64::{engine::general_purpose, Engine as _};
85            let bytes = general_purpose::STANDARD_NO_PAD
86                .decode(s.as_bytes())
87                .map_err(|e| e.to_string())?;
88            let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
89            serde_json::from_str(&json).map_err(|e| e.to_string())
90        }
91    }
92}
93
94use account_set_cursor::*;
95use members_cursor::*;
96
97#[derive(EsRepo, Debug, Clone)]
98#[es_repo(
99    entity = "AccountSet",
100    err = "AccountSetError",
101    columns(
102        name(
103            ty = "String",
104            update(accessor = "values().name"),
105            list_by,
106            list_for(by(created_at))
107        ),
108        journal_id(ty = "JournalId", update(persist = false)),
109        external_id(
110            ty = "Option<String>",
111            update(accessor = "values().external_id"),
112            list_by
113        ),
114    ),
115    tbl_prefix = "cala",
116    post_persist_hook = "publish",
117    persist_event_context = false
118)]
119pub(super) struct AccountSetRepo {
120    pool: PgPool,
121    publisher: OutboxPublisher,
122}
123
124impl AccountSetRepo {
125    pub fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
126        Self {
127            pool: pool.clone(),
128            publisher: publisher.clone(),
129        }
130    }
131
132    pub async fn list_children_by_created_at(
133        &self,
134        id: AccountSetId,
135        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
136    ) -> Result<
137        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
138        AccountSetError,
139    > {
140        self.list_children_by_created_at_in_op(&self.pool, id, args)
141            .await
142    }
143
144    #[instrument(
145        name = "account_set.list_children_by_created_at_in_op",
146        skip_all,
147        err(level = "warn")
148    )]
149    pub async fn list_children_by_created_at_in_op(
150        &self,
151        op: impl es_entity::IntoOneTimeExecutor<'_>,
152        account_set_id: AccountSetId,
153        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
154    ) -> Result<
155        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
156        AccountSetError,
157    > {
158        let es_entity::PaginatedQueryArgs { first, after } = args;
159        let (member_id, created_at) = if let Some(after) = after {
160            (Some(after.id), Some(after.member_created_at))
161        } else {
162            (None, None)
163        };
164
165        let id = match member_id {
166            Some(member_id) => match member_id {
167                AccountSetMemberId::Account(id) => Some(id),
168                AccountSetMemberId::AccountSet(id) => Some(id.into()),
169            },
170            None => None,
171        };
172
173        let rows = op
174            .into_executor()
175            .fetch_all(sqlx::query!(
176                r#"
177            WITH member_accounts AS (
178              SELECT
179                member_account_id AS member_id,
180                member_account_id,
181                NULL::uuid AS member_account_set_id,
182                created_at
183              FROM cala_account_set_member_accounts
184              WHERE
185                transitive IS FALSE
186                AND account_set_id = $4
187                AND (COALESCE((created_at, member_account_id) < ($3, $2), $2 IS NULL))
188              ORDER BY created_at DESC, member_account_id DESC
189              LIMIT $1
190            ), member_sets AS (
191              SELECT
192                member_account_set_id AS member_id,
193                NULL::uuid AS member_account_id,
194                member_account_set_id,
195                created_at
196              FROM cala_account_set_member_account_sets
197              WHERE
198                account_set_id = $4
199                AND (COALESCE((created_at, member_account_set_id) < ($3, $2), $2 IS NULL))
200              ORDER BY created_at DESC, member_account_set_id DESC
201              LIMIT $1
202            ), all_members AS (
203              SELECT * FROM member_accounts
204              UNION ALL
205              SELECT * FROM member_sets
206            )
207            SELECT * FROM all_members
208            ORDER BY created_at DESC, member_id DESC
209            LIMIT $1
210          "#,
211                (first + 1) as i64,
212                id.map(uuid::Uuid::from),
213                created_at,
214                uuid::Uuid::from(account_set_id),
215            ))
216            .await?;
217        let has_next_page = rows.len() > first;
218        let mut end_cursor = None;
219        if let Some(last) = rows.last() {
220            let id = last
221                .member_account_id
222                .map(|account_id| AccountSetMemberId::Account(account_id.into()))
223                .or_else(|| {
224                    last.member_account_set_id
225                        .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
226                });
227            end_cursor = Some(AccountSetMembersByCreatedAtCursor {
228                id: id.expect("member_id not set"),
229                member_created_at: last.created_at.expect("created_at not set"),
230            });
231        }
232
233        let account_set_members = rows
234            .into_iter()
235            .take(first)
236            .map(
237                |row| match (row.member_account_id, row.member_account_set_id) {
238                    (Some(member_account_id), _) => AccountSetMember::from((
239                        AccountSetMemberId::Account(AccountId::from(member_account_id)),
240                        row.created_at.expect("created at should always be present"),
241                    )),
242                    (_, Some(member_account_set_id)) => AccountSetMember::from((
243                        AccountSetMemberId::AccountSet(AccountSetId::from(member_account_set_id)),
244                        row.created_at.expect("created at should always be present"),
245                    )),
246                    _ => unreachable!(),
247                },
248            )
249            .collect::<Vec<AccountSetMember>>();
250
251        Ok(es_entity::PaginatedQueryRet {
252            entities: account_set_members,
253            has_next_page,
254            end_cursor,
255        })
256    }
257
258    pub async fn list_children_by_external_id(
259        &self,
260        id: AccountSetId,
261        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
262    ) -> Result<
263        es_entity::PaginatedQueryRet<
264            AccountSetMemberByExternalId,
265            AccountSetMembersByExternalIdCursor,
266        >,
267        AccountSetError,
268    > {
269        self.list_children_by_external_id_in_op(&self.pool, id, args)
270            .await
271    }
272
273    pub async fn list_children_by_external_id_in_op(
274        &self,
275        op: impl es_entity::IntoOneTimeExecutor<'_>,
276        account_set_id: AccountSetId,
277        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
278    ) -> Result<
279        es_entity::PaginatedQueryRet<
280            AccountSetMemberByExternalId,
281            AccountSetMembersByExternalIdCursor,
282        >,
283        AccountSetError,
284    > {
285        let es_entity::PaginatedQueryArgs { first, after } = args;
286        let (member_id, external_id) = if let Some(after) = after {
287            (Some(after.id), after.external_id)
288        } else {
289            (None, None)
290        };
291
292        let id = match member_id {
293            Some(member_id) => match member_id {
294                AccountSetMemberId::Account(id) => Some(id),
295                AccountSetMemberId::AccountSet(id) => Some(id.into()),
296            },
297            None => None,
298        };
299
300        let rows = op
301            .into_executor()
302            .fetch_all(sqlx::query!(
303                r#"
304            WITH member_accounts AS (
305              SELECT
306                member_account_id AS member_id,
307                member_account_id,
308                NULL::uuid AS member_account_set_id,
309                a.external_id
310              FROM cala_account_set_member_accounts m
311              LEFT JOIN cala_accounts a ON m.member_account_id = a.id
312              WHERE
313                transitive IS FALSE
314                AND m.account_set_id = $4
315                AND (
316                  ($3::varchar IS NULL) OR
317                  (a.external_id IS NULL AND $3::varchar IS NOT NULL) OR
318                  (a.external_id > $3::varchar) OR
319                  (a.external_id = $3::varchar AND member_account_id > $2)
320                )
321              ORDER BY a.external_id ASC NULLS LAST, member_account_id ASC
322              LIMIT $1
323            ), member_sets AS (
324              SELECT
325                member_account_set_id AS member_id,
326                NULL::uuid AS member_account_id,
327                member_account_set_id,
328                s.external_id
329              FROM cala_account_set_member_account_sets m
330              LEFT JOIN cala_account_sets s ON m.member_account_set_id = s.id
331              WHERE
332                m.account_set_id = $4
333                AND (
334                  ($3::varchar IS NULL) OR
335                  (s.external_id IS NULL AND $3::varchar IS NOT NULL) OR
336                  (s.external_id > $3::varchar) OR
337                  (s.external_id = $3::varchar AND member_account_set_id > $2)
338                )
339              ORDER BY s.external_id ASC NULLS LAST, member_account_set_id ASC
340              LIMIT $1
341            ), all_members AS (
342              SELECT * FROM member_accounts
343              UNION ALL
344              SELECT * FROM member_sets
345            )
346            SELECT * FROM all_members
347            ORDER BY external_id ASC NULLS LAST, member_id ASC
348            LIMIT $1
349        "#,
350                (first + 1) as i64,
351                id.map(uuid::Uuid::from),
352                external_id,
353                uuid::Uuid::from(account_set_id),
354            ))
355            .await?;
356
357        let has_next_page = rows.len() > first;
358        let mut end_cursor = None;
359        if let Some(last) = rows.last() {
360            let id = last
361                .member_account_id
362                .map(|account_id| AccountSetMemberId::Account(account_id.into()))
363                .or_else(|| {
364                    last.member_account_set_id
365                        .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
366                });
367            end_cursor = Some(AccountSetMembersByExternalIdCursor {
368                id: id.expect("member_id not set"),
369                external_id: last.external_id.clone(),
370            });
371        }
372
373        let account_set_members = rows
374            .into_iter()
375            .take(first)
376            .map(
377                |row| match (row.member_account_id, row.member_account_set_id) {
378                    (Some(member_account_id), _) => AccountSetMemberByExternalId {
379                        id: AccountSetMemberId::Account(AccountId::from(member_account_id)),
380                        external_id: row.external_id,
381                    },
382                    (_, Some(member_account_set_id)) => AccountSetMemberByExternalId {
383                        id: AccountSetMemberId::AccountSet(AccountSetId::from(
384                            member_account_set_id,
385                        )),
386                        external_id: row.external_id,
387                    },
388                    _ => unreachable!(),
389                },
390            )
391            .collect::<Vec<AccountSetMemberByExternalId>>();
392
393        Ok(es_entity::PaginatedQueryRet {
394            entities: account_set_members,
395            has_next_page,
396            end_cursor,
397        })
398    }
399
400    #[instrument(
401        name = "account_set.add_member_account_and_return_parents",
402        skip_all,
403        err(level = "warn")
404    )]
405    pub async fn add_member_account_and_return_parents(
406        &self,
407        db: &mut impl es_entity::AtomicOperation,
408        account_set_id: AccountSetId,
409        account_id: AccountId,
410    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
411        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
412            .execute(db.as_executor())
413            .await?;
414        let rows = sqlx::query!(r#"
415          WITH RECURSIVE parents AS (
416            SELECT m.member_account_set_id, m.account_set_id
417            FROM cala_account_set_member_account_sets m
418            JOIN cala_account_sets s
419            ON s.id = m.account_set_id
420            WHERE m.member_account_set_id = $1
421
422            UNION ALL
423            SELECT p.member_account_set_id, m.account_set_id
424            FROM parents p
425            JOIN cala_account_set_member_account_sets m
426                ON p.account_set_id = m.member_account_set_id
427          ),
428          non_transitive_insert AS (
429            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id)
430            VALUES ($1, $2)
431          ),
432          transitive_insert AS (
433            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
434            SELECT p.account_set_id, $2, TRUE
435            FROM parents p
436          )
437          SELECT account_set_id, NULL AS now
438          FROM parents
439          UNION ALL
440          SELECT NULL AS account_set_id, NOW() AS now
441          "#,
442            account_set_id as AccountSetId,
443            account_id as AccountId,
444        )
445        .fetch_all(db.as_executor())
446        .await?;
447        let mut time = None;
448        let ret = rows
449            .into_iter()
450            .filter_map(|row| {
451                if let Some(t) = row.now {
452                    time = Some(t);
453                    None
454                } else {
455                    Some(AccountSetId::from(
456                        row.account_set_id.expect("account_set_id not set"),
457                    ))
458                }
459            })
460            .collect();
461
462        self.publisher
463            .publish_all(
464                db,
465                std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
466                    account_set_id,
467                    member_id: crate::account_set::AccountSetMemberId::Account(account_id),
468                }),
469            )
470            .await?;
471
472        Ok((time.expect("time not set"), ret))
473    }
474
475    pub async fn remove_member_account_and_return_parents(
476        &self,
477        db: &mut impl es_entity::AtomicOperation,
478        account_set_id: AccountSetId,
479        account_id: AccountId,
480    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
481        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
482            .execute(db.as_executor())
483            .await?;
484        let rows = sqlx::query!(
485            r#"
486          WITH RECURSIVE parents AS (
487            SELECT m.member_account_set_id, m.account_set_id
488            FROM cala_account_set_member_account_sets m
489            JOIN cala_account_sets s
490            ON s.id = m.account_set_id
491            WHERE m.member_account_set_id = $1
492
493            UNION ALL
494            SELECT p.member_account_set_id, m.account_set_id
495            FROM parents p
496            JOIN cala_account_set_member_account_sets m
497                ON p.account_set_id = m.member_account_set_id
498          ),
499          deletions as (
500            DELETE FROM cala_account_set_member_accounts
501            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
502            AND member_account_id = $2
503          )
504          SELECT account_set_id, NULL AS now
505          FROM parents
506          UNION ALL
507          SELECT NULL AS account_set_id, NOW() AS now
508          "#,
509            account_set_id as AccountSetId,
510            account_id as AccountId,
511        )
512        .fetch_all(db.as_executor())
513        .await?;
514        let mut time = None;
515        let ret = rows
516            .into_iter()
517            .filter_map(|row| {
518                if let Some(t) = row.now {
519                    time = Some(t);
520                    None
521                } else {
522                    Some(AccountSetId::from(
523                        row.account_set_id.expect("account_set_id not set"),
524                    ))
525                }
526            })
527            .collect();
528
529        self.publisher
530            .publish_all(
531                db,
532                std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
533                    account_set_id,
534                    member_id: crate::account_set::AccountSetMemberId::Account(account_id),
535                }),
536            )
537            .await?;
538
539        Ok((time.expect("time not set"), ret))
540    }
541
542    pub async fn add_member_set_and_return_parents(
543        &self,
544        db: &mut impl es_entity::AtomicOperation,
545        account_set_id: AccountSetId,
546        member_account_set_id: AccountSetId,
547    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
548        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
549            .execute(db.as_executor())
550            .await?;
551        let rows = sqlx::query!(r#"
552          WITH RECURSIVE parents AS (
553            SELECT m.member_account_set_id, m.account_set_id
554            FROM cala_account_set_member_account_sets m
555            JOIN cala_account_sets s
556            ON s.id = m.account_set_id
557            WHERE m.member_account_set_id = $1
558
559            UNION ALL
560            SELECT p.member_account_set_id, m.account_set_id
561            FROM parents p
562            JOIN cala_account_set_member_account_sets m
563                ON p.account_set_id = m.member_account_set_id
564          ),
565          set_insert AS (
566            INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id)
567            VALUES ($1, $2)
568          ),
569          new_members AS (
570            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
571            SELECT $1, m.member_account_id, TRUE
572            FROM cala_account_set_member_accounts m
573            WHERE m.account_set_id = $2
574            RETURNING member_account_id
575          ),
576          transitive_inserts AS (
577            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
578            SELECT p.account_set_id, n.member_account_id, TRUE
579            FROM parents p
580            CROSS JOIN new_members n
581          )
582          SELECT account_set_id, NULL AS now
583          FROM parents
584          UNION ALL
585          SELECT NULL AS account_set_id, NOW() AS now
586          "#,
587            account_set_id as AccountSetId,
588            member_account_set_id as AccountSetId,
589        )
590        .fetch_all(db.as_executor())
591        .await?;
592        let mut time = None;
593        let ret = rows
594            .into_iter()
595            .filter_map(|row| {
596                if let Some(t) = row.now {
597                    time = Some(t);
598                    None
599                } else {
600                    Some(AccountSetId::from(
601                        row.account_set_id.expect("account_set_id not set"),
602                    ))
603                }
604            })
605            .collect();
606
607        self.publisher
608            .publish_all(
609                db,
610                std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
611                    account_set_id,
612                    member_id: crate::account_set::AccountSetMemberId::AccountSet(
613                        member_account_set_id,
614                    ),
615                }),
616            )
617            .await?;
618
619        Ok((time.expect("time not set"), ret))
620    }
621
622    pub async fn remove_member_set_and_return_parents(
623        &self,
624        db: &mut impl es_entity::AtomicOperation,
625        account_set_id: AccountSetId,
626        member_account_set_id: AccountSetId,
627    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
628        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
629            .execute(db.as_executor())
630            .await?;
631        let rows = sqlx::query!(
632            r#"
633          WITH RECURSIVE parents AS (
634            SELECT m.member_account_set_id, m.account_set_id
635            FROM cala_account_set_member_account_sets m
636            JOIN cala_account_sets s
637            ON s.id = m.account_set_id
638            WHERE m.member_account_set_id = $1
639
640            UNION ALL
641            SELECT p.member_account_set_id, m.account_set_id
642            FROM parents p
643            JOIN cala_account_set_member_account_sets m
644                ON p.account_set_id = m.member_account_set_id
645          ),
646          member_accounts_deletion AS (
647            DELETE FROM cala_account_set_member_accounts
648            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
649            AND member_account_id IN (SELECT member_account_id FROM cala_account_set_member_accounts
650                                      WHERE account_set_id = $2)
651          ),
652          member_account_set_deletion AS (
653            DELETE FROM cala_account_set_member_account_sets
654            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
655            AND member_account_set_id = $2
656          )
657          SELECT account_set_id, NULL AS now
658          FROM parents
659          UNION ALL
660          SELECT NULL AS account_set_id, NOW() AS now
661          "#,
662            account_set_id as AccountSetId,
663            member_account_set_id as AccountSetId,
664        )
665        .fetch_all(db.as_executor())
666        .await?;
667        let mut time = None;
668        let ret = rows
669            .into_iter()
670            .filter_map(|row| {
671                if let Some(t) = row.now {
672                    time = Some(t);
673                    None
674                } else {
675                    Some(AccountSetId::from(
676                        row.account_set_id.expect("account_set_id not set"),
677                    ))
678                }
679            })
680            .collect();
681
682        self.publisher
683            .publish_all(
684                db,
685                std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
686                    account_set_id,
687                    member_id: crate::account_set::AccountSetMemberId::AccountSet(
688                        member_account_set_id,
689                    ),
690                }),
691            )
692            .await?;
693
694        Ok((time.expect("time not set"), ret))
695    }
696
697    pub async fn find_where_account_is_member(
698        &self,
699        account_id: AccountId,
700        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
701    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
702    {
703        self.find_where_account_is_member_in_op(&self.pool, account_id, query)
704            .await
705    }
706
707    pub async fn find_where_account_is_member_in_op(
708        &self,
709        op: impl es_entity::IntoOneTimeExecutor<'_>,
710        account_id: AccountId,
711        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
712    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
713    {
714        let (entities, has_next_page) = es_entity::es_query!(
715            tbl_prefix = "cala",
716            r#"SELECT a.id, a.name, a.created_at
717              FROM cala_account_sets a
718              JOIN cala_account_set_member_accounts asm
719              ON asm.account_set_id = a.id
720              WHERE asm.member_account_id = $1 AND transitive IS FALSE
721              AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
722              ORDER BY a.name, a.id
723              LIMIT $4"#,
724            account_id as AccountId,
725            query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
726            query.after.map(|c| c.name),
727            query.first as i64 + 1
728        )
729        .fetch_n(op, query.first)
730        .await?;
731
732        let mut end_cursor = None;
733        if let Some(last) = entities.last() {
734            end_cursor = Some(AccountSetsByNameCursor {
735                id: last.values().id,
736                name: last.values().name.clone(),
737            });
738        }
739        Ok(es_entity::PaginatedQueryRet {
740            entities,
741            has_next_page,
742            end_cursor,
743        })
744    }
745
746    pub async fn find_where_account_set_is_member(
747        &self,
748        account_set_id: AccountSetId,
749        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
750    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
751    {
752        self.find_where_account_set_is_member_in_op(&self.pool, account_set_id, query)
753            .await
754    }
755
756    pub async fn find_where_account_set_is_member_in_op(
757        &self,
758        op: impl es_entity::IntoOneTimeExecutor<'_>,
759        account_set_id: AccountSetId,
760        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
761    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
762    {
763        let (entities, has_next_page) = es_entity::es_query!(
764            tbl_prefix = "cala",
765            r#"SELECT a.id, a.name, a.created_at
766               FROM cala_account_sets a
767               JOIN cala_account_set_member_account_sets asm
768               ON asm.account_set_id = a.id
769               WHERE asm.member_account_set_id = $1
770               AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
771               ORDER BY a.name, a.id
772               LIMIT $4"#,
773            account_set_id as AccountSetId,
774            query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
775            query.after.map(|c| c.name),
776            query.first as i64 + 1
777        )
778        .fetch_n(op, query.first)
779        .await?;
780        let mut end_cursor = None;
781        if let Some(last) = entities.last() {
782            end_cursor = Some(AccountSetsByNameCursor {
783                id: last.values().id,
784                name: last.values().name.clone(),
785            });
786        }
787        Ok(es_entity::PaginatedQueryRet {
788            entities,
789            has_next_page,
790            end_cursor,
791        })
792    }
793
794    #[instrument(
795        name = "account_set.fetch_mappings_in_op",
796        skip_all,
797        err(level = "warn")
798    )]
799    pub async fn fetch_mappings_in_op(
800        &self,
801        op: impl es_entity::IntoOneTimeExecutor<'_>,
802        journal_id: JournalId,
803        account_ids: &[AccountId],
804    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
805        let rows = op.into_executor().fetch_all(sqlx::query!(
806            r#"
807          SELECT m.account_set_id AS "set_id!: AccountSetId", m.member_account_id AS "account_id!: AccountId"
808          FROM cala_account_set_member_accounts m
809          JOIN cala_account_sets s
810          ON m.account_set_id = s.id AND s.journal_id = $1
811          WHERE m.member_account_id = ANY($2)
812          "#,
813            journal_id as JournalId,
814            account_ids as &[AccountId]
815        ))
816        .await?;
817        let mut mappings = HashMap::new();
818        for row in rows {
819            mappings
820                .entry(row.account_id)
821                .or_insert_with(Vec::new)
822                .push(row.set_id);
823        }
824        Ok(mappings)
825    }
826
827    async fn publish(
828        &self,
829        op: &mut impl es_entity::AtomicOperation,
830        entity: &AccountSet,
831        new_events: es_entity::LastPersisted<'_, AccountSetEvent>,
832    ) -> Result<(), AccountSetError> {
833        self.publisher
834            .publish_entity_events(op, entity, new_events)
835            .await?;
836        Ok(())
837    }
838}