1use chrono::{DateTime, Utc};
2use es_entity::*;
3use sqlx::{Executor, PgPool, Postgres, Transaction};
4
5use std::collections::HashMap;
6
7use crate::primitives::{AccountId, DataSourceId, JournalId};
8
9use super::{entity::*, error::*};
10
11const ADDVISORY_LOCK_ID: i64 = 123456;
12
13pub mod members_cursor {
14 use cala_types::account_set::{AccountSetMember, AccountSetMemberId};
15 use serde::{Deserialize, Serialize};
16
17 #[derive(Debug, Serialize, Deserialize)]
18 pub struct AccountSetMembersCursor {
19 pub id: AccountSetMemberId,
20 pub member_created_at: chrono::DateTime<chrono::Utc>,
21 }
22
23 impl From<&AccountSetMember> for AccountSetMembersCursor {
24 fn from(member: &AccountSetMember) -> Self {
25 Self {
26 id: member.id,
27 member_created_at: member.created_at,
28 }
29 }
30 }
31
32 #[cfg(feature = "graphql")]
33 impl async_graphql::connection::CursorType for AccountSetMembersCursor {
34 type Error = String;
35
36 fn encode_cursor(&self) -> String {
37 use base64::{engine::general_purpose, Engine as _};
38 let json = serde_json::to_string(&self).expect("could not serialize token");
39 general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
40 }
41
42 fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
43 use base64::{engine::general_purpose, Engine as _};
44 let bytes = general_purpose::STANDARD_NO_PAD
45 .decode(s.as_bytes())
46 .map_err(|e| e.to_string())?;
47 let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
48 serde_json::from_str(&json).map_err(|e| e.to_string())
49 }
50 }
51}
52
53use account_set_cursor::*;
54use members_cursor::*;
55
56#[derive(EsRepo, Debug, Clone)]
57#[es_repo(
58 entity = "AccountSet",
59 err = "AccountSetError",
60 columns(
61 name(ty = "String", update(accessor = "values().name"), list_by, list_for),
62 journal_id(ty = "JournalId", update(persist = false)),
63 external_id(
64 ty = "Option<String>",
65 update(accessor = "values().external_id"),
66 list_by
67 ),
68 data_source_id(
69 ty = "DataSourceId",
70 create(accessor = "data_source().into()"),
71 update(persist = false)
72 ),
73 ),
74 tbl_prefix = "cala"
75)]
76pub(super) struct AccountSetRepo {
77 pool: PgPool,
78}
79
80impl AccountSetRepo {
81 pub fn new(pool: &PgPool) -> Self {
82 Self { pool: pool.clone() }
83 }
84
85 pub async fn list_children(
86 &self,
87 id: AccountSetId,
88 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
89 ) -> Result<
90 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
91 AccountSetError,
92 > {
93 self.list_children_in_executor(&self.pool, id, args).await
94 }
95
96 pub async fn list_children_in_tx(
97 &self,
98 db: &mut Transaction<'_, Postgres>,
99 id: AccountSetId,
100 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
101 ) -> Result<
102 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
103 AccountSetError,
104 > {
105 self.list_children_in_executor(&mut **db, id, args).await
106 }
107
108 async fn list_children_in_executor(
109 &self,
110 executor: impl Executor<'_, Database = Postgres>,
111 account_set_id: AccountSetId,
112 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
113 ) -> Result<
114 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
115 AccountSetError,
116 > {
117 let es_entity::PaginatedQueryArgs { first, after } = args;
118 let (member_id, created_at) = if let Some(after) = after {
119 (Some(after.id), Some(after.member_created_at))
120 } else {
121 (None, None)
122 };
123
124 let id = match member_id {
125 Some(member_id) => match member_id {
126 AccountSetMemberId::Account(id) => Some(id),
127 AccountSetMemberId::AccountSet(id) => Some(id.into()),
128 },
129 None => None,
130 };
131
132 let rows = sqlx::query!(
133 r#"
134 WITH member_accounts AS (
135 SELECT
136 member_account_id AS member_id,
137 member_account_id,
138 NULL::uuid AS member_account_set_id,
139 created_at
140 FROM cala_account_set_member_accounts
141 WHERE
142 transitive IS FALSE
143 AND account_set_id = $4
144 AND (COALESCE((created_at, member_account_id) < ($3, $2), $2 IS NULL))
145 ORDER BY created_at DESC, member_account_id DESC
146 LIMIT $1
147 ), member_sets AS (
148 SELECT
149 member_account_set_id AS member_id,
150 NULL::uuid AS member_account_id,
151 member_account_set_id,
152 created_at
153 FROM cala_account_set_member_account_sets
154 WHERE
155 account_set_id = $4
156 AND (COALESCE((created_at, member_account_set_id) < ($3, $2), $2 IS NULL))
157 ORDER BY created_at DESC, member_account_set_id DESC
158 LIMIT $1
159 ), all_members AS (
160 SELECT * FROM member_accounts
161 UNION ALL
162 SELECT * FROM member_sets
163 )
164 SELECT * FROM all_members
165 ORDER BY created_at DESC, member_id DESC
166 LIMIT $1
167 "#,
168 (first + 1) as i64,
169 id.map(uuid::Uuid::from),
170 created_at,
171 uuid::Uuid::from(account_set_id),
172 )
173 .fetch_all(executor)
174 .await?;
175 let has_next_page = rows.len() > first;
176 let mut end_cursor = None;
177 if let Some(last) = rows.last() {
178 let id = last
179 .member_account_id
180 .map(|account_id| AccountSetMemberId::Account(account_id.into()))
181 .or_else(|| {
182 last.member_account_set_id
183 .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
184 });
185 end_cursor = Some(AccountSetMembersCursor {
186 id: id.expect("member_id not set"),
187 member_created_at: last.created_at.expect("created_at not set"),
188 });
189 }
190
191 let account_set_members = rows
192 .into_iter()
193 .take(first)
194 .map(
195 |row| match (row.member_account_id, row.member_account_set_id) {
196 (Some(member_account_id), _) => AccountSetMember::from((
197 AccountSetMemberId::Account(AccountId::from(member_account_id)),
198 row.created_at.expect("created at should always be present"),
199 )),
200 (_, Some(member_account_set_id)) => AccountSetMember::from((
201 AccountSetMemberId::AccountSet(AccountSetId::from(member_account_set_id)),
202 row.created_at.expect("created at should always be present"),
203 )),
204 _ => unreachable!(),
205 },
206 )
207 .collect::<Vec<AccountSetMember>>();
208
209 Ok(es_entity::PaginatedQueryRet {
210 entities: account_set_members,
211 has_next_page,
212 end_cursor,
213 })
214 }
215
216 pub async fn add_member_account_and_return_parents(
217 &self,
218 db: &mut Transaction<'_, Postgres>,
219 account_set_id: AccountSetId,
220 account_id: AccountId,
221 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
222 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
223 .execute(&mut **db)
224 .await?;
225 let rows = sqlx::query!(r#"
226 WITH RECURSIVE parents AS (
227 SELECT m.member_account_set_id, m.account_set_id
228 FROM cala_account_set_member_account_sets m
229 JOIN cala_account_sets s
230 ON s.id = m.account_set_id
231 WHERE m.member_account_set_id = $1
232
233 UNION ALL
234 SELECT p.member_account_set_id, m.account_set_id
235 FROM parents p
236 JOIN cala_account_set_member_account_sets m
237 ON p.account_set_id = m.member_account_set_id
238 ),
239 non_transitive_insert AS (
240 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id)
241 VALUES ($1, $2)
242 ),
243 transitive_insert AS (
244 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
245 SELECT p.account_set_id, $2, TRUE
246 FROM parents p
247 )
248 SELECT account_set_id, NULL AS now
249 FROM parents
250 UNION ALL
251 SELECT NULL AS account_set_id, NOW() AS now
252 "#,
253 account_set_id as AccountSetId,
254 account_id as AccountId,
255 )
256 .fetch_all(&mut **db)
257 .await?;
258 let mut time = None;
259 let ret = rows
260 .into_iter()
261 .filter_map(|row| {
262 if let Some(t) = row.now {
263 time = Some(t);
264 None
265 } else {
266 Some(AccountSetId::from(
267 row.account_set_id.expect("account_set_id not set"),
268 ))
269 }
270 })
271 .collect();
272 Ok((time.expect("time not set"), ret))
273 }
274
275 pub async fn remove_member_account_and_return_parents(
276 &self,
277 db: &mut Transaction<'_, Postgres>,
278 account_set_id: AccountSetId,
279 account_id: AccountId,
280 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
281 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
282 .execute(&mut **db)
283 .await?;
284 let rows = sqlx::query!(
285 r#"
286 WITH RECURSIVE parents AS (
287 SELECT m.member_account_set_id, m.account_set_id
288 FROM cala_account_set_member_account_sets m
289 JOIN cala_account_sets s
290 ON s.id = m.account_set_id
291 WHERE m.member_account_set_id = $1
292
293 UNION ALL
294 SELECT p.member_account_set_id, m.account_set_id
295 FROM parents p
296 JOIN cala_account_set_member_account_sets m
297 ON p.account_set_id = m.member_account_set_id
298 ),
299 deletions as (
300 DELETE FROM cala_account_set_member_accounts
301 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
302 AND member_account_id = $2
303 )
304 SELECT account_set_id, NULL AS now
305 FROM parents
306 UNION ALL
307 SELECT NULL AS account_set_id, NOW() AS now
308 "#,
309 account_set_id as AccountSetId,
310 account_id as AccountId,
311 )
312 .fetch_all(&mut **db)
313 .await?;
314 let mut time = None;
315 let ret = rows
316 .into_iter()
317 .filter_map(|row| {
318 if let Some(t) = row.now {
319 time = Some(t);
320 None
321 } else {
322 Some(AccountSetId::from(
323 row.account_set_id.expect("account_set_id not set"),
324 ))
325 }
326 })
327 .collect();
328 Ok((time.expect("time not set"), ret))
329 }
330
331 pub async fn add_member_set_and_return_parents(
332 &self,
333 db: &mut Transaction<'_, Postgres>,
334 account_set_id: AccountSetId,
335 member_account_set_id: AccountSetId,
336 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
337 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
338 .execute(&mut **db)
339 .await?;
340 let rows = sqlx::query!(r#"
341 WITH RECURSIVE parents AS (
342 SELECT m.member_account_set_id, m.account_set_id
343 FROM cala_account_set_member_account_sets m
344 JOIN cala_account_sets s
345 ON s.id = m.account_set_id
346 WHERE m.member_account_set_id = $1
347
348 UNION ALL
349 SELECT p.member_account_set_id, m.account_set_id
350 FROM parents p
351 JOIN cala_account_set_member_account_sets m
352 ON p.account_set_id = m.member_account_set_id
353 ),
354 set_insert AS (
355 INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id)
356 VALUES ($1, $2)
357 ),
358 new_members AS (
359 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
360 SELECT $1, m.member_account_id, TRUE
361 FROM cala_account_set_member_accounts m
362 WHERE m.account_set_id = $2
363 RETURNING member_account_id
364 ),
365 transitive_inserts AS (
366 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
367 SELECT p.account_set_id, n.member_account_id, TRUE
368 FROM parents p
369 CROSS JOIN new_members n
370 )
371 SELECT account_set_id, NULL AS now
372 FROM parents
373 UNION ALL
374 SELECT NULL AS account_set_id, NOW() AS now
375 "#,
376 account_set_id as AccountSetId,
377 member_account_set_id as AccountSetId,
378 )
379 .fetch_all(&mut **db)
380 .await?;
381 let mut time = None;
382 let ret = rows
383 .into_iter()
384 .filter_map(|row| {
385 if let Some(t) = row.now {
386 time = Some(t);
387 None
388 } else {
389 Some(AccountSetId::from(
390 row.account_set_id.expect("account_set_id not set"),
391 ))
392 }
393 })
394 .collect();
395 Ok((time.expect("time not set"), ret))
396 }
397
398 pub async fn remove_member_set_and_return_parents(
399 &self,
400 db: &mut Transaction<'_, Postgres>,
401 account_set_id: AccountSetId,
402 member_account_set_id: AccountSetId,
403 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
404 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
405 .execute(&mut **db)
406 .await?;
407 let rows = sqlx::query!(
408 r#"
409 WITH RECURSIVE parents AS (
410 SELECT m.member_account_set_id, m.account_set_id
411 FROM cala_account_set_member_account_sets m
412 JOIN cala_account_sets s
413 ON s.id = m.account_set_id
414 WHERE m.member_account_set_id = $1
415
416 UNION ALL
417 SELECT p.member_account_set_id, m.account_set_id
418 FROM parents p
419 JOIN cala_account_set_member_account_sets m
420 ON p.account_set_id = m.member_account_set_id
421 ),
422 member_accounts_deletion AS (
423 DELETE FROM cala_account_set_member_accounts
424 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
425 AND member_account_id IN (SELECT member_account_id FROM cala_account_set_member_accounts
426 WHERE account_set_id = $2)
427 ),
428 member_account_set_deletion AS (
429 DELETE FROM cala_account_set_member_account_sets
430 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
431 AND member_account_set_id = $2
432 )
433 SELECT account_set_id, NULL AS now
434 FROM parents
435 UNION ALL
436 SELECT NULL AS account_set_id, NOW() AS now
437 "#,
438 account_set_id as AccountSetId,
439 member_account_set_id as AccountSetId,
440 )
441 .fetch_all(&mut **db)
442 .await?;
443 let mut time = None;
444 let ret = rows
445 .into_iter()
446 .filter_map(|row| {
447 if let Some(t) = row.now {
448 time = Some(t);
449 None
450 } else {
451 Some(AccountSetId::from(
452 row.account_set_id.expect("account_set_id not set"),
453 ))
454 }
455 })
456 .collect();
457 Ok((time.expect("time not set"), ret))
458 }
459
460 pub async fn find_where_account_is_member(
461 &self,
462 account_id: AccountId,
463 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
464 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
465 {
466 self.find_where_account_is_member_in_executor(&self.pool, account_id, query)
467 .await
468 }
469
470 pub async fn find_where_account_is_member_in_tx(
471 &self,
472 tx: &mut Transaction<'_, Postgres>,
473 account_id: AccountId,
474 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
475 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
476 {
477 self.find_where_account_is_member_in_executor(&mut **tx, account_id, query)
478 .await
479 }
480
481 async fn find_where_account_is_member_in_executor(
482 &self,
483 executor: impl Executor<'_, Database = Postgres>,
484 account_id: AccountId,
485 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
486 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
487 {
488 let rows = sqlx::query_as!(
489 account_set_repo_types::Repo__DbEvent,
490 r#"
491 WITH member_account_sets AS (
492 SELECT a.id, a.name, a.created_at
493 FROM cala_account_set_member_accounts asm
494 JOIN cala_account_sets a ON asm.account_set_id = a.id
495 WHERE asm.member_account_id = $1 AND transitive IS FALSE
496 AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
497 ORDER BY a.name, a.id
498 LIMIT $4
499 )
500 SELECT mas.id AS "entity_id!: AccountSetId", e.sequence, e.event, e.recorded_at
501 FROM member_account_sets mas
502 JOIN cala_account_set_events e ON mas.id = e.id
503 ORDER BY mas.name, mas.id, e.sequence
504 "#,
505 account_id as AccountId,
506 query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
507 query.after.map(|c| c.name),
508 query.first as i64 + 1
509 )
510 .fetch_all(executor)
511 .await?;
512
513 let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
514 let mut end_cursor = None;
515 if let Some(last) = entities.last() {
516 end_cursor = Some(AccountSetsByNameCursor {
517 id: last.values().id,
518 name: last.values().name.clone(),
519 });
520 }
521 Ok(es_entity::PaginatedQueryRet {
522 entities,
523 has_next_page,
524 end_cursor,
525 })
526 }
527
528 pub async fn find_where_account_set_is_member(
529 &self,
530 account_set_id: AccountSetId,
531 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
532 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
533 {
534 self.find_where_account_set_is_member_in_executor(&self.pool, account_set_id, query)
535 .await
536 }
537
538 pub async fn find_where_account_set_is_member_in_tx(
539 &self,
540 tx: &mut Transaction<'_, Postgres>,
541 account_set_id: AccountSetId,
542 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
543 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
544 {
545 self.find_where_account_set_is_member_in_executor(&mut **tx, account_set_id, query)
546 .await
547 }
548
549 async fn find_where_account_set_is_member_in_executor(
550 &self,
551 executor: impl Executor<'_, Database = Postgres>,
552 account_set_id: AccountSetId,
553 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
554 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
555 {
556 let rows = sqlx::query_as!(
557 account_set_repo_types::Repo__DbEvent,
558 r#"
559 WITH member_account_sets AS (
560 SELECT a.id, a.name, a.created_at
561 FROM cala_account_set_member_account_sets asm
562 JOIN cala_account_sets a ON asm.account_set_id = a.id
563 WHERE asm.member_account_set_id = $1
564 AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
565 ORDER BY a.name, a.id
566 LIMIT $4
567 )
568 SELECT mas.id AS "entity_id!: AccountSetId", e.sequence, e.event, e.recorded_at
569 FROM member_account_sets mas
570 JOIN cala_account_set_events e ON mas.id = e.id
571 ORDER BY mas.name, mas.id, e.sequence
572 "#,
573 account_set_id as AccountSetId,
574 query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
575 query.after.map(|c| c.name),
576 query.first as i64 + 1
577 )
578 .fetch_all(executor)
579 .await?;
580
581 let (entities, has_next_page) = EntityEvents::load_n::<AccountSet>(rows, query.first)?;
582 let mut end_cursor = None;
583 if let Some(last) = entities.last() {
584 end_cursor = Some(AccountSetsByNameCursor {
585 id: last.values().id,
586 name: last.values().name.clone(),
587 });
588 }
589 Ok(es_entity::PaginatedQueryRet {
590 entities,
591 has_next_page,
592 end_cursor,
593 })
594 }
595
596 #[cfg(feature = "import")]
597 pub async fn import_in_op(
598 &self,
599 op: &mut DbOp<'_>,
600 origin: DataSourceId,
601 account_set: &mut AccountSet,
602 ) -> Result<(), AccountSetError> {
603 let recorded_at = op.now();
604 sqlx::query!(
605 r#"INSERT INTO cala_account_sets (data_source_id, id, journal_id, name, external_id, created_at)
606 VALUES ($1, $2, $3, $4, $5, $6)"#,
607 origin as DataSourceId,
608 account_set.values().id as AccountSetId,
609 account_set.values().journal_id as JournalId,
610 account_set.values().name,
611 account_set.values().external_id,
612 recorded_at
613 )
614 .execute(&mut **op.tx())
615 .await?;
616 self.persist_events(op, &mut account_set.events).await?;
617 Ok(())
618 }
619
620 pub async fn fetch_mappings(
621 &self,
622 journal_id: JournalId,
623 account_ids: &[AccountId],
624 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
625 let rows = sqlx::query!(
626 r#"
627 SELECT m.account_set_id AS "set_id!: AccountSetId", m.member_account_id AS "account_id!: AccountId"
628 FROM cala_account_set_member_accounts m
629 JOIN cala_account_sets s
630 ON m.account_set_id = s.id AND s.journal_id = $1
631 WHERE m.member_account_id = ANY($2)
632 "#,
633 journal_id as JournalId,
634 account_ids as &[AccountId]
635 )
636 .fetch_all(&self.pool)
637 .await?;
638 let mut mappings = HashMap::new();
639 for row in rows {
640 mappings
641 .entry(row.account_id)
642 .or_insert_with(Vec::new)
643 .push(row.set_id);
644 }
645 Ok(mappings)
646 }
647
648 #[cfg(feature = "import")]
649 pub async fn import_member_account_in_op(
650 &self,
651 op: &mut DbOp<'_>,
652 account_set_id: AccountSetId,
653 account_id: AccountId,
654 ) -> Result<(), AccountSetError> {
655 let recorded_at = op.now();
656 sqlx::query!(
657 r#"INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, created_at)
658 VALUES ($1, $2, $3)"#,
659 account_set_id as AccountSetId,
660 account_id as AccountId,
661 recorded_at
662 )
663 .execute(&mut **op.tx())
664 .await?;
665 Ok(())
666 }
667
668 #[cfg(feature = "import")]
669 pub async fn import_remove_member_account(
670 &self,
671 db: &mut Transaction<'_, Postgres>,
672 account_set_id: AccountSetId,
673 account_id: AccountId,
674 ) -> Result<(), AccountSetError> {
675 sqlx::query!(
676 r#"DELETE FROM cala_account_set_member_accounts
677 WHERE account_set_id = $1 AND member_account_id = $2"#,
678 account_set_id as AccountSetId,
679 account_id as AccountId,
680 )
681 .execute(&mut **db)
682 .await?;
683 Ok(())
684 }
685
686 #[cfg(feature = "import")]
687 pub async fn import_member_set_in_op(
688 &self,
689 op: &mut DbOp<'_>,
690 account_set_id: AccountSetId,
691 member_account_set_id: AccountSetId,
692 ) -> Result<(), AccountSetError> {
693 let recorded_at = op.now();
694 sqlx::query!(
695 r#"INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id, created_at)
696 VALUES ($1, $2, $3)"#,
697 account_set_id as AccountSetId,
698 member_account_set_id as AccountSetId,
699 recorded_at
700 )
701 .execute(&mut **op.tx())
702 .await?;
703 Ok(())
704 }
705
706 #[cfg(feature = "import")]
707 pub async fn import_remove_member_set(
708 &self,
709 db: &mut Transaction<'_, Postgres>,
710 account_set_id: AccountSetId,
711 member_account_set_id: AccountSetId,
712 ) -> Result<(), AccountSetError> {
713 sqlx::query!(
714 r#"DELETE FROM cala_account_set_member_account_sets
715 WHERE account_set_id = $1 AND member_account_set_id = $2"#,
716 account_set_id as AccountSetId,
717 member_account_set_id as AccountSetId,
718 )
719 .execute(&mut **db)
720 .await?;
721 Ok(())
722 }
723}