cala_ledger/account_set/
repo.rs

1use chrono::{DateTime, Utc};
2use es_entity::*;
3use sqlx::{Executor, PgPool, Postgres, Transaction};
4
5use std::collections::HashMap;
6
7use crate::primitives::{AccountId, DataSourceId, JournalId};
8
9use super::{entity::*, error::*};
10
11const ADDVISORY_LOCK_ID: i64 = 123456;
12
13pub mod members_cursor {
14    use cala_types::account_set::{AccountSetMember, AccountSetMemberId};
15    use serde::{Deserialize, Serialize};
16
17    #[derive(Debug, Serialize, Deserialize)]
18    pub struct AccountSetMembersCursor {
19        pub id: AccountSetMemberId,
20        pub member_created_at: chrono::DateTime<chrono::Utc>,
21    }
22
23    impl From<&AccountSetMember> for AccountSetMembersCursor {
24        fn from(member: &AccountSetMember) -> Self {
25            Self {
26                id: member.id,
27                member_created_at: member.created_at,
28            }
29        }
30    }
31
32    #[cfg(feature = "graphql")]
33    impl async_graphql::connection::CursorType for AccountSetMembersCursor {
34        type Error = String;
35
36        fn encode_cursor(&self) -> String {
37            use base64::{engine::general_purpose, Engine as _};
38            let json = serde_json::to_string(&self).expect("could not serialize token");
39            general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
40        }
41
42        fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
43            use base64::{engine::general_purpose, Engine as _};
44            let bytes = general_purpose::STANDARD_NO_PAD
45                .decode(s.as_bytes())
46                .map_err(|e| e.to_string())?;
47            let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
48            serde_json::from_str(&json).map_err(|e| e.to_string())
49        }
50    }
51}
52
53use account_set_cursor::*;
54use members_cursor::*;
55
56#[derive(EsRepo, Debug, Clone)]
57#[es_repo(
58    entity = "AccountSet",
59    err = "AccountSetError",
60    columns(
61        name(ty = "String", update(accessor = "values().name"), list_by, list_for),
62        journal_id(ty = "JournalId", update(persist = false)),
63        external_id(
64            ty = "Option<String>",
65            update(accessor = "values().external_id"),
66            list_by
67        ),
68        data_source_id(
69            ty = "DataSourceId",
70            create(accessor = "data_source().into()"),
71            update(persist = false)
72        ),
73    ),
74    tbl_prefix = "cala"
75)]
76pub(super) struct AccountSetRepo {
77    pool: PgPool,
78}
79
80impl AccountSetRepo {
81    pub fn new(pool: &PgPool) -> Self {
82        Self { pool: pool.clone() }
83    }
84
85    pub async fn list_children(
86        &self,
87        id: AccountSetId,
88        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
89    ) -> Result<
90        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
91        AccountSetError,
92    > {
93        self.list_children_in_executor(&self.pool, id, args).await
94    }
95
96    pub async fn list_children_in_tx(
97        &self,
98        db: &mut Transaction<'_, Postgres>,
99        id: AccountSetId,
100        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
101    ) -> Result<
102        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
103        AccountSetError,
104    > {
105        self.list_children_in_executor(&mut **db, id, args).await
106    }
107
108    async fn list_children_in_executor(
109        &self,
110        executor: impl Executor<'_, Database = Postgres>,
111        account_set_id: AccountSetId,
112        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
113    ) -> Result<
114        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
115        AccountSetError,
116    > {
117        let es_entity::PaginatedQueryArgs { first, after } = args;
118        let (member_id, created_at) = if let Some(after) = after {
119            (Some(after.id), Some(after.member_created_at))
120        } else {
121            (None, None)
122        };
123
124        let id = match member_id {
125            Some(member_id) => match member_id {
126                AccountSetMemberId::Account(id) => Some(id),
127                AccountSetMemberId::AccountSet(id) => Some(id.into()),
128            },
129            None => None,
130        };
131
132        let rows = sqlx::query!(
133            r#"
134            WITH member_accounts AS (
135              SELECT
136                member_account_id AS member_id,
137                member_account_id,
138                NULL::uuid AS member_account_set_id,
139                created_at
140              FROM cala_account_set_member_accounts
141              WHERE
142                transitive IS FALSE
143                AND account_set_id = $4
144                AND (COALESCE((created_at, member_account_id) < ($3, $2), $2 IS NULL))
145              ORDER BY created_at DESC, member_account_id DESC
146              LIMIT $1
147            ), member_sets AS (
148              SELECT
149                member_account_set_id AS member_id,
150                NULL::uuid AS member_account_id,
151                member_account_set_id,
152                created_at
153              FROM cala_account_set_member_account_sets
154              WHERE
155                account_set_id = $4
156                AND (COALESCE((created_at, member_account_set_id) < ($3, $2), $2 IS NULL))
157              ORDER BY created_at DESC, member_account_set_id DESC
158              LIMIT $1
159            ), all_members AS (
160              SELECT * FROM member_accounts
161              UNION ALL
162              SELECT * FROM member_sets
163            )
164            SELECT * FROM all_members
165            ORDER BY created_at DESC, member_id DESC
166            LIMIT $1
167          "#,
168            (first + 1) as i64,
169            id.map(uuid::Uuid::from),
170            created_at,
171            uuid::Uuid::from(account_set_id),
172        )
173        .fetch_all(executor)
174        .await?;
175        let has_next_page = rows.len() > first;
176        let mut end_cursor = None;
177        if let Some(last) = rows.last() {
178            let id = last
179                .member_account_id
180                .map(|account_id| AccountSetMemberId::Account(account_id.into()))
181                .or_else(|| {
182                    last.member_account_set_id
183                        .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
184                });
185            end_cursor = Some(AccountSetMembersCursor {
186                id: id.expect("member_id not set"),
187                member_created_at: last.created_at.expect("created_at not set"),
188            });
189        }
190
191        let account_set_members = rows
192            .into_iter()
193            .take(first)
194            .map(
195                |row| match (row.member_account_id, row.member_account_set_id) {
196                    (Some(member_account_id), _) => AccountSetMember::from((
197                        AccountSetMemberId::Account(AccountId::from(member_account_id)),
198                        row.created_at.expect("created at should always be present"),
199                    )),
200                    (_, Some(member_account_set_id)) => AccountSetMember::from((
201                        AccountSetMemberId::AccountSet(AccountSetId::from(member_account_set_id)),
202                        row.created_at.expect("created at should always be present"),
203                    )),
204                    _ => unreachable!(),
205                },
206            )
207            .collect::<Vec<AccountSetMember>>();
208
209        Ok(es_entity::PaginatedQueryRet {
210            entities: account_set_members,
211            has_next_page,
212            end_cursor,
213        })
214    }
215
216    pub async fn add_member_account_and_return_parents(
217        &self,
218        db: &mut Transaction<'_, Postgres>,
219        account_set_id: AccountSetId,
220        account_id: AccountId,
221    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
222        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
223            .execute(&mut **db)
224            .await?;
225        let rows = sqlx::query!(r#"
226          WITH RECURSIVE parents AS (
227            SELECT m.member_account_set_id, m.account_set_id
228            FROM cala_account_set_member_account_sets m
229            JOIN cala_account_sets s
230            ON s.id = m.account_set_id
231            WHERE m.member_account_set_id = $1
232
233            UNION ALL
234            SELECT p.member_account_set_id, m.account_set_id
235            FROM parents p
236            JOIN cala_account_set_member_account_sets m
237                ON p.account_set_id = m.member_account_set_id
238          ),
239          non_transitive_insert AS (
240            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id)
241            VALUES ($1, $2)
242          ),
243          transitive_insert AS (
244            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
245            SELECT p.account_set_id, $2, TRUE
246            FROM parents p
247          )
248          SELECT account_set_id, NULL AS now
249          FROM parents
250          UNION ALL
251          SELECT NULL AS account_set_id, NOW() AS now
252          "#,
253            account_set_id as AccountSetId,
254            account_id as AccountId,
255        )
256        .fetch_all(&mut **db)
257        .await?;
258        let mut time = None;
259        let ret = rows
260            .into_iter()
261            .filter_map(|row| {
262                if let Some(t) = row.now {
263                    time = Some(t);
264                    None
265                } else {
266                    Some(AccountSetId::from(
267                        row.account_set_id.expect("account_set_id not set"),
268                    ))
269                }
270            })
271            .collect();
272        Ok((time.expect("time not set"), ret))
273    }
274
275    pub async fn remove_member_account_and_return_parents(
276        &self,
277        db: &mut Transaction<'_, Postgres>,
278        account_set_id: AccountSetId,
279        account_id: AccountId,
280    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
281        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
282            .execute(&mut **db)
283            .await?;
284        let rows = sqlx::query!(
285            r#"
286          WITH RECURSIVE parents AS (
287            SELECT m.member_account_set_id, m.account_set_id
288            FROM cala_account_set_member_account_sets m
289            JOIN cala_account_sets s
290            ON s.id = m.account_set_id
291            WHERE m.member_account_set_id = $1
292
293            UNION ALL
294            SELECT p.member_account_set_id, m.account_set_id
295            FROM parents p
296            JOIN cala_account_set_member_account_sets m
297                ON p.account_set_id = m.member_account_set_id
298          ),
299          deletions as (
300            DELETE FROM cala_account_set_member_accounts
301            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
302            AND member_account_id = $2
303          )
304          SELECT account_set_id, NULL AS now
305          FROM parents
306          UNION ALL
307          SELECT NULL AS account_set_id, NOW() AS now
308          "#,
309            account_set_id as AccountSetId,
310            account_id as AccountId,
311        )
312        .fetch_all(&mut **db)
313        .await?;
314        let mut time = None;
315        let ret = rows
316            .into_iter()
317            .filter_map(|row| {
318                if let Some(t) = row.now {
319                    time = Some(t);
320                    None
321                } else {
322                    Some(AccountSetId::from(
323                        row.account_set_id.expect("account_set_id not set"),
324                    ))
325                }
326            })
327            .collect();
328        Ok((time.expect("time not set"), ret))
329    }
330
331    pub async fn add_member_set_and_return_parents(
332        &self,
333        db: &mut Transaction<'_, Postgres>,
334        account_set_id: AccountSetId,
335        member_account_set_id: AccountSetId,
336    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
337        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
338            .execute(&mut **db)
339            .await?;
340        let rows = sqlx::query!(r#"
341          WITH RECURSIVE parents AS (
342            SELECT m.member_account_set_id, m.account_set_id
343            FROM cala_account_set_member_account_sets m
344            JOIN cala_account_sets s
345            ON s.id = m.account_set_id
346            WHERE m.member_account_set_id = $1
347
348            UNION ALL
349            SELECT p.member_account_set_id, m.account_set_id
350            FROM parents p
351            JOIN cala_account_set_member_account_sets m
352                ON p.account_set_id = m.member_account_set_id
353          ),
354          set_insert AS (
355            INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id)
356            VALUES ($1, $2)
357          ),
358          new_members AS (
359            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
360            SELECT $1, m.member_account_id, TRUE
361            FROM cala_account_set_member_accounts m
362            WHERE m.account_set_id = $2
363            RETURNING member_account_id
364          ),
365          transitive_inserts AS (
366            INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
367            SELECT p.account_set_id, n.member_account_id, TRUE
368            FROM parents p
369            CROSS JOIN new_members n
370          )
371          SELECT account_set_id, NULL AS now
372          FROM parents
373          UNION ALL
374          SELECT NULL AS account_set_id, NOW() AS now
375          "#,
376            account_set_id as AccountSetId,
377            member_account_set_id as AccountSetId,
378        )
379        .fetch_all(&mut **db)
380        .await?;
381        let mut time = None;
382        let ret = rows
383            .into_iter()
384            .filter_map(|row| {
385                if let Some(t) = row.now {
386                    time = Some(t);
387                    None
388                } else {
389                    Some(AccountSetId::from(
390                        row.account_set_id.expect("account_set_id not set"),
391                    ))
392                }
393            })
394            .collect();
395        Ok((time.expect("time not set"), ret))
396    }
397
398    pub async fn remove_member_set_and_return_parents(
399        &self,
400        db: &mut Transaction<'_, Postgres>,
401        account_set_id: AccountSetId,
402        member_account_set_id: AccountSetId,
403    ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
404        sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
405            .execute(&mut **db)
406            .await?;
407        let rows = sqlx::query!(
408            r#"
409          WITH RECURSIVE parents AS (
410            SELECT m.member_account_set_id, m.account_set_id
411            FROM cala_account_set_member_account_sets m
412            JOIN cala_account_sets s
413            ON s.id = m.account_set_id
414            WHERE m.member_account_set_id = $1
415
416            UNION ALL
417            SELECT p.member_account_set_id, m.account_set_id
418            FROM parents p
419            JOIN cala_account_set_member_account_sets m
420                ON p.account_set_id = m.member_account_set_id
421          ),
422          member_accounts_deletion AS (
423            DELETE FROM cala_account_set_member_accounts
424            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
425            AND member_account_id IN (SELECT member_account_id FROM cala_account_set_member_accounts
426                                      WHERE account_set_id = $2)
427          ),
428          member_account_set_deletion AS (
429            DELETE FROM cala_account_set_member_account_sets
430            WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
431            AND member_account_set_id = $2
432          )
433          SELECT account_set_id, NULL AS now
434          FROM parents
435          UNION ALL
436          SELECT NULL AS account_set_id, NOW() AS now
437          "#,
438            account_set_id as AccountSetId,
439            member_account_set_id as AccountSetId,
440        )
441        .fetch_all(&mut **db)
442        .await?;
443        let mut time = None;
444        let ret = rows
445            .into_iter()
446            .filter_map(|row| {
447                if let Some(t) = row.now {
448                    time = Some(t);
449                    None
450                } else {
451                    Some(AccountSetId::from(
452                        row.account_set_id.expect("account_set_id not set"),
453                    ))
454                }
455            })
456            .collect();
457        Ok((time.expect("time not set"), ret))
458    }
459
460    pub async fn find_where_account_is_member(
461        &self,
462        account_id: AccountId,
463        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
464    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
465    {
466        self.find_where_account_is_member_in_executor(&self.pool, account_id, query)
467            .await
468    }
469
470    pub async fn find_where_account_is_member_in_tx(
471        &self,
472        tx: &mut Transaction<'_, Postgres>,
473        account_id: AccountId,
474        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
475    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
476    {
477        self.find_where_account_is_member_in_executor(&mut **tx, account_id, query)
478            .await
479    }
480
481    async fn find_where_account_is_member_in_executor(
482        &self,
483        executor: impl Executor<'_, Database = Postgres>,
484        account_id: AccountId,
485        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
486    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
487    {
488        let rows = sqlx::query_as!(
489            account_set_repo_types::Repo__DbEvent,
490            r#"
491            WITH member_account_sets AS (
492              SELECT a.id, a.name, a.created_at
493              FROM cala_account_set_member_accounts asm
494              JOIN cala_account_sets a ON asm.account_set_id = a.id
495              WHERE asm.member_account_id = $1 AND transitive IS FALSE
496              AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
497              ORDER BY a.name, a.id
498              LIMIT $4
499            )
500            SELECT mas.id AS "entity_id!: AccountSetId", e.sequence, e.event, e.recorded_at
501              FROM member_account_sets mas
502              JOIN cala_account_set_events e ON mas.id = e.id
503              ORDER BY mas.name, mas.id, e.sequence
504            "#,
505            account_id as AccountId,
506            query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
507            query.after.map(|c| c.name),
508            query.first as i64 + 1
509        )
510        .fetch_all(executor)
511        .await?;
512
513        let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
514        let mut end_cursor = None;
515        if let Some(last) = entities.last() {
516            end_cursor = Some(AccountSetsByNameCursor {
517                id: last.values().id,
518                name: last.values().name.clone(),
519            });
520        }
521        Ok(es_entity::PaginatedQueryRet {
522            entities,
523            has_next_page,
524            end_cursor,
525        })
526    }
527
528    pub async fn find_where_account_set_is_member(
529        &self,
530        account_set_id: AccountSetId,
531        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
532    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
533    {
534        self.find_where_account_set_is_member_in_executor(&self.pool, account_set_id, query)
535            .await
536    }
537
538    pub async fn find_where_account_set_is_member_in_tx(
539        &self,
540        tx: &mut Transaction<'_, Postgres>,
541        account_set_id: AccountSetId,
542        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
543    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
544    {
545        self.find_where_account_set_is_member_in_executor(&mut **tx, account_set_id, query)
546            .await
547    }
548
549    async fn find_where_account_set_is_member_in_executor(
550        &self,
551        executor: impl Executor<'_, Database = Postgres>,
552        account_set_id: AccountSetId,
553        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
554    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
555    {
556        let rows = sqlx::query_as!(
557            account_set_repo_types::Repo__DbEvent,
558            r#"
559            WITH member_account_sets AS (
560              SELECT a.id, a.name, a.created_at
561              FROM cala_account_set_member_account_sets asm
562              JOIN cala_account_sets a ON asm.account_set_id = a.id
563              WHERE asm.member_account_set_id = $1
564              AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
565              ORDER BY a.name, a.id
566              LIMIT $4
567            )
568            SELECT mas.id AS "entity_id!: AccountSetId", e.sequence, e.event, e.recorded_at
569              FROM member_account_sets mas
570              JOIN cala_account_set_events e ON mas.id = e.id
571              ORDER BY mas.name, mas.id, e.sequence
572            "#,
573            account_set_id as AccountSetId,
574            query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
575            query.after.map(|c| c.name),
576            query.first as i64 + 1
577        )
578        .fetch_all(executor)
579        .await?;
580
581        let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
582        let mut end_cursor = None;
583        if let Some(last) = entities.last() {
584            end_cursor = Some(AccountSetsByNameCursor {
585                id: last.values().id,
586                name: last.values().name.clone(),
587            });
588        }
589        Ok(es_entity::PaginatedQueryRet {
590            entities,
591            has_next_page,
592            end_cursor,
593        })
594    }
595
596    #[cfg(feature = "import")]
597    pub async fn import_in_op(
598        &self,
599        op: &mut DbOp<'_>,
600        origin: DataSourceId,
601        account_set: &mut AccountSet,
602    ) -> Result<(), AccountSetError> {
603        let recorded_at = op.now();
604        sqlx::query!(
605            r#"INSERT INTO cala_account_sets (data_source_id, id, journal_id, name, external_id, created_at)
606            VALUES ($1, $2, $3, $4, $5, $6)"#,
607            origin as DataSourceId,
608            account_set.values().id as AccountSetId,
609            account_set.values().journal_id as JournalId,
610            account_set.values().name,
611            account_set.values().external_id,
612            recorded_at
613        )
614        .execute(&mut **op.tx())
615        .await?;
616        self.persist_events(op, &mut account_set.events).await?;
617        Ok(())
618    }
619
620    pub async fn fetch_mappings(
621        &self,
622        journal_id: JournalId,
623        account_ids: &[AccountId],
624    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
625        let rows = sqlx::query!(
626            r#"
627          SELECT m.account_set_id AS "set_id!: AccountSetId", m.member_account_id AS "account_id!: AccountId"
628          FROM cala_account_set_member_accounts m
629          JOIN cala_account_sets s
630          ON m.account_set_id = s.id AND s.journal_id = $1
631          WHERE m.member_account_id = ANY($2)
632          "#,
633            journal_id as JournalId,
634            account_ids as &[AccountId]
635        )
636        .fetch_all(&self.pool)
637        .await?;
638        let mut mappings = HashMap::new();
639        for row in rows {
640            mappings
641                .entry(row.account_id)
642                .or_insert_with(Vec::new)
643                .push(row.set_id);
644        }
645        Ok(mappings)
646    }
647
648    #[cfg(feature = "import")]
649    pub async fn import_member_account_in_op(
650        &self,
651        op: &mut DbOp<'_>,
652        account_set_id: AccountSetId,
653        account_id: AccountId,
654    ) -> Result<(), AccountSetError> {
655        let recorded_at = op.now();
656        sqlx::query!(
657            r#"INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, created_at)
658            VALUES ($1, $2, $3)"#,
659            account_set_id as AccountSetId,
660            account_id as AccountId,
661            recorded_at
662        )
663        .execute(&mut **op.tx())
664        .await?;
665        Ok(())
666    }
667
668    #[cfg(feature = "import")]
669    pub async fn import_remove_member_account(
670        &self,
671        db: &mut Transaction<'_, Postgres>,
672        account_set_id: AccountSetId,
673        account_id: AccountId,
674    ) -> Result<(), AccountSetError> {
675        sqlx::query!(
676            r#"DELETE FROM cala_account_set_member_accounts
677            WHERE account_set_id = $1 AND member_account_id = $2"#,
678            account_set_id as AccountSetId,
679            account_id as AccountId,
680        )
681        .execute(&mut **db)
682        .await?;
683        Ok(())
684    }
685
686    #[cfg(feature = "import")]
687    pub async fn import_member_set_in_op(
688        &self,
689        op: &mut DbOp<'_>,
690        account_set_id: AccountSetId,
691        member_account_set_id: AccountSetId,
692    ) -> Result<(), AccountSetError> {
693        let recorded_at = op.now();
694        sqlx::query!(
695            r#"INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id, created_at)
696            VALUES ($1, $2, $3)"#,
697            account_set_id as AccountSetId,
698            member_account_set_id as AccountSetId,
699            recorded_at
700        )
701        .execute(&mut **op.tx())
702        .await?;
703        Ok(())
704    }
705
706    #[cfg(feature = "import")]
707    pub async fn import_remove_member_set(
708        &self,
709        db: &mut Transaction<'_, Postgres>,
710        account_set_id: AccountSetId,
711        member_account_set_id: AccountSetId,
712    ) -> Result<(), AccountSetError> {
713        sqlx::query!(
714            r#"DELETE FROM cala_account_set_member_account_sets
715            WHERE account_set_id = $1 AND member_account_set_id = $2"#,
716            account_set_id as AccountSetId,
717            member_account_set_id as AccountSetId,
718        )
719        .execute(&mut **db)
720        .await?;
721        Ok(())
722    }
723}