1use chrono::{DateTime, Utc};
2use es_entity::*;
3use sqlx::PgPool;
4use tracing::instrument;
5
6use std::collections::HashMap;
7
8use crate::{
9 outbox::OutboxPublisher,
10 primitives::{AccountId, JournalId},
11};
12
13use super::{entity::*, error::*};
14
15const ADDVISORY_LOCK_ID: i64 = 123456;
16
17pub mod members_cursor {
18 use cala_types::account_set::{
19 AccountSetMember, AccountSetMemberByExternalId, AccountSetMemberId,
20 };
21 use serde::{Deserialize, Serialize};
22
23 #[derive(Debug, Serialize, Deserialize)]
24 pub struct AccountSetMembersByCreatedAtCursor {
25 pub id: AccountSetMemberId,
26 pub member_created_at: chrono::DateTime<chrono::Utc>,
27 }
28
29 impl From<&AccountSetMember> for AccountSetMembersByCreatedAtCursor {
30 fn from(member: &AccountSetMember) -> Self {
31 Self {
32 id: member.id,
33 member_created_at: member.created_at,
34 }
35 }
36 }
37
38 #[cfg(feature = "graphql")]
39 impl async_graphql::connection::CursorType for AccountSetMembersByCreatedAtCursor {
40 type Error = String;
41
42 fn encode_cursor(&self) -> String {
43 use base64::{engine::general_purpose, Engine as _};
44 let json = serde_json::to_string(&self).expect("could not serialize token");
45 general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
46 }
47
48 fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
49 use base64::{engine::general_purpose, Engine as _};
50 let bytes = general_purpose::STANDARD_NO_PAD
51 .decode(s.as_bytes())
52 .map_err(|e| e.to_string())?;
53 let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
54 serde_json::from_str(&json).map_err(|e| e.to_string())
55 }
56 }
57
58 #[derive(Debug, Serialize, Deserialize)]
59 pub struct AccountSetMembersByExternalIdCursor {
60 pub id: AccountSetMemberId,
61 pub external_id: Option<String>,
62 }
63
64 impl From<&AccountSetMemberByExternalId> for AccountSetMembersByExternalIdCursor {
65 fn from(member: &AccountSetMemberByExternalId) -> Self {
66 Self {
67 id: member.id,
68 external_id: member.external_id.clone(),
69 }
70 }
71 }
72
73 #[cfg(feature = "graphql")]
74 impl async_graphql::connection::CursorType for AccountSetMembersByExternalIdCursor {
75 type Error = String;
76
77 fn encode_cursor(&self) -> String {
78 use base64::{engine::general_purpose, Engine as _};
79 let json = serde_json::to_string(&self).expect("could not serialize token");
80 general_purpose::STANDARD_NO_PAD.encode(json.as_bytes())
81 }
82
83 fn decode_cursor(s: &str) -> Result<Self, Self::Error> {
84 use base64::{engine::general_purpose, Engine as _};
85 let bytes = general_purpose::STANDARD_NO_PAD
86 .decode(s.as_bytes())
87 .map_err(|e| e.to_string())?;
88 let json = String::from_utf8(bytes).map_err(|e| e.to_string())?;
89 serde_json::from_str(&json).map_err(|e| e.to_string())
90 }
91 }
92}
93
94use account_set_cursor::*;
95use members_cursor::*;
96
97#[derive(EsRepo, Debug, Clone)]
98#[es_repo(
99 entity = "AccountSet",
100 err = "AccountSetError",
101 columns(
102 name(
103 ty = "String",
104 update(accessor = "values().name"),
105 list_by,
106 list_for(by(created_at))
107 ),
108 journal_id(ty = "JournalId", update(persist = false)),
109 external_id(
110 ty = "Option<String>",
111 update(accessor = "values().external_id"),
112 list_by
113 ),
114 ),
115 tbl_prefix = "cala",
116 post_persist_hook = "publish",
117 persist_event_context = false
118)]
119pub(super) struct AccountSetRepo {
120 pool: PgPool,
121 publisher: OutboxPublisher,
122}
123
124impl AccountSetRepo {
125 pub fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
126 Self {
127 pool: pool.clone(),
128 publisher: publisher.clone(),
129 }
130 }
131
132 pub async fn list_children_by_created_at(
133 &self,
134 id: AccountSetId,
135 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
136 ) -> Result<
137 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
138 AccountSetError,
139 > {
140 self.list_children_by_created_at_in_op(&self.pool, id, args)
141 .await
142 }
143
144 #[instrument(
145 name = "account_set.list_children_by_created_at_in_op",
146 skip_all,
147 err(level = "warn")
148 )]
149 pub async fn list_children_by_created_at_in_op(
150 &self,
151 op: impl es_entity::IntoOneTimeExecutor<'_>,
152 account_set_id: AccountSetId,
153 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
154 ) -> Result<
155 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
156 AccountSetError,
157 > {
158 let es_entity::PaginatedQueryArgs { first, after } = args;
159 let (member_id, created_at) = if let Some(after) = after {
160 (Some(after.id), Some(after.member_created_at))
161 } else {
162 (None, None)
163 };
164
165 let id = match member_id {
166 Some(member_id) => match member_id {
167 AccountSetMemberId::Account(id) => Some(id),
168 AccountSetMemberId::AccountSet(id) => Some(id.into()),
169 },
170 None => None,
171 };
172
173 let rows = op
174 .into_executor()
175 .fetch_all(sqlx::query!(
176 r#"
177 WITH member_accounts AS (
178 SELECT
179 member_account_id AS member_id,
180 member_account_id,
181 NULL::uuid AS member_account_set_id,
182 created_at
183 FROM cala_account_set_member_accounts
184 WHERE
185 transitive IS FALSE
186 AND account_set_id = $4
187 AND (COALESCE((created_at, member_account_id) < ($3, $2), $2 IS NULL))
188 ORDER BY created_at DESC, member_account_id DESC
189 LIMIT $1
190 ), member_sets AS (
191 SELECT
192 member_account_set_id AS member_id,
193 NULL::uuid AS member_account_id,
194 member_account_set_id,
195 created_at
196 FROM cala_account_set_member_account_sets
197 WHERE
198 account_set_id = $4
199 AND (COALESCE((created_at, member_account_set_id) < ($3, $2), $2 IS NULL))
200 ORDER BY created_at DESC, member_account_set_id DESC
201 LIMIT $1
202 ), all_members AS (
203 SELECT * FROM member_accounts
204 UNION ALL
205 SELECT * FROM member_sets
206 )
207 SELECT * FROM all_members
208 ORDER BY created_at DESC, member_id DESC
209 LIMIT $1
210 "#,
211 (first + 1) as i64,
212 id.map(uuid::Uuid::from),
213 created_at,
214 uuid::Uuid::from(account_set_id),
215 ))
216 .await?;
217 let has_next_page = rows.len() > first;
218 let mut end_cursor = None;
219 if let Some(last) = rows.last() {
220 let id = last
221 .member_account_id
222 .map(|account_id| AccountSetMemberId::Account(account_id.into()))
223 .or_else(|| {
224 last.member_account_set_id
225 .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
226 });
227 end_cursor = Some(AccountSetMembersByCreatedAtCursor {
228 id: id.expect("member_id not set"),
229 member_created_at: last.created_at.expect("created_at not set"),
230 });
231 }
232
233 let account_set_members = rows
234 .into_iter()
235 .take(first)
236 .map(
237 |row| match (row.member_account_id, row.member_account_set_id) {
238 (Some(member_account_id), _) => AccountSetMember::from((
239 AccountSetMemberId::Account(AccountId::from(member_account_id)),
240 row.created_at.expect("created at should always be present"),
241 )),
242 (_, Some(member_account_set_id)) => AccountSetMember::from((
243 AccountSetMemberId::AccountSet(AccountSetId::from(member_account_set_id)),
244 row.created_at.expect("created at should always be present"),
245 )),
246 _ => unreachable!(),
247 },
248 )
249 .collect::<Vec<AccountSetMember>>();
250
251 Ok(es_entity::PaginatedQueryRet {
252 entities: account_set_members,
253 has_next_page,
254 end_cursor,
255 })
256 }
257
258 pub async fn list_children_by_external_id(
259 &self,
260 id: AccountSetId,
261 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
262 ) -> Result<
263 es_entity::PaginatedQueryRet<
264 AccountSetMemberByExternalId,
265 AccountSetMembersByExternalIdCursor,
266 >,
267 AccountSetError,
268 > {
269 self.list_children_by_external_id_in_op(&self.pool, id, args)
270 .await
271 }
272
273 pub async fn list_children_by_external_id_in_op(
274 &self,
275 op: impl es_entity::IntoOneTimeExecutor<'_>,
276 account_set_id: AccountSetId,
277 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
278 ) -> Result<
279 es_entity::PaginatedQueryRet<
280 AccountSetMemberByExternalId,
281 AccountSetMembersByExternalIdCursor,
282 >,
283 AccountSetError,
284 > {
285 let es_entity::PaginatedQueryArgs { first, after } = args;
286 let (member_id, external_id) = if let Some(after) = after {
287 (Some(after.id), after.external_id)
288 } else {
289 (None, None)
290 };
291
292 let id = match member_id {
293 Some(member_id) => match member_id {
294 AccountSetMemberId::Account(id) => Some(id),
295 AccountSetMemberId::AccountSet(id) => Some(id.into()),
296 },
297 None => None,
298 };
299
300 let rows = op
301 .into_executor()
302 .fetch_all(sqlx::query!(
303 r#"
304 WITH member_accounts AS (
305 SELECT
306 member_account_id AS member_id,
307 member_account_id,
308 NULL::uuid AS member_account_set_id,
309 a.external_id
310 FROM cala_account_set_member_accounts m
311 LEFT JOIN cala_accounts a ON m.member_account_id = a.id
312 WHERE
313 transitive IS FALSE
314 AND m.account_set_id = $4
315 AND (
316 ($3::varchar IS NULL) OR
317 (a.external_id IS NULL AND $3::varchar IS NOT NULL) OR
318 (a.external_id > $3::varchar) OR
319 (a.external_id = $3::varchar AND member_account_id > $2)
320 )
321 ORDER BY a.external_id ASC NULLS LAST, member_account_id ASC
322 LIMIT $1
323 ), member_sets AS (
324 SELECT
325 member_account_set_id AS member_id,
326 NULL::uuid AS member_account_id,
327 member_account_set_id,
328 s.external_id
329 FROM cala_account_set_member_account_sets m
330 LEFT JOIN cala_account_sets s ON m.member_account_set_id = s.id
331 WHERE
332 m.account_set_id = $4
333 AND (
334 ($3::varchar IS NULL) OR
335 (s.external_id IS NULL AND $3::varchar IS NOT NULL) OR
336 (s.external_id > $3::varchar) OR
337 (s.external_id = $3::varchar AND member_account_set_id > $2)
338 )
339 ORDER BY s.external_id ASC NULLS LAST, member_account_set_id ASC
340 LIMIT $1
341 ), all_members AS (
342 SELECT * FROM member_accounts
343 UNION ALL
344 SELECT * FROM member_sets
345 )
346 SELECT * FROM all_members
347 ORDER BY external_id ASC NULLS LAST, member_id ASC
348 LIMIT $1
349 "#,
350 (first + 1) as i64,
351 id.map(uuid::Uuid::from),
352 external_id,
353 uuid::Uuid::from(account_set_id),
354 ))
355 .await?;
356
357 let has_next_page = rows.len() > first;
358 let mut end_cursor = None;
359 if let Some(last) = rows.last() {
360 let id = last
361 .member_account_id
362 .map(|account_id| AccountSetMemberId::Account(account_id.into()))
363 .or_else(|| {
364 last.member_account_set_id
365 .map(|account_set_id| AccountSetMemberId::AccountSet(account_set_id.into()))
366 });
367 end_cursor = Some(AccountSetMembersByExternalIdCursor {
368 id: id.expect("member_id not set"),
369 external_id: last.external_id.clone(),
370 });
371 }
372
373 let account_set_members = rows
374 .into_iter()
375 .take(first)
376 .map(
377 |row| match (row.member_account_id, row.member_account_set_id) {
378 (Some(member_account_id), _) => AccountSetMemberByExternalId {
379 id: AccountSetMemberId::Account(AccountId::from(member_account_id)),
380 external_id: row.external_id,
381 },
382 (_, Some(member_account_set_id)) => AccountSetMemberByExternalId {
383 id: AccountSetMemberId::AccountSet(AccountSetId::from(
384 member_account_set_id,
385 )),
386 external_id: row.external_id,
387 },
388 _ => unreachable!(),
389 },
390 )
391 .collect::<Vec<AccountSetMemberByExternalId>>();
392
393 Ok(es_entity::PaginatedQueryRet {
394 entities: account_set_members,
395 has_next_page,
396 end_cursor,
397 })
398 }
399
400 #[instrument(
401 name = "account_set.add_member_account_and_return_parents",
402 skip_all,
403 err(level = "warn")
404 )]
405 pub async fn add_member_account_and_return_parents(
406 &self,
407 db: &mut impl es_entity::AtomicOperation,
408 account_set_id: AccountSetId,
409 account_id: AccountId,
410 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
411 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
412 .execute(db.as_executor())
413 .await?;
414 let rows = sqlx::query!(r#"
415 WITH RECURSIVE parents AS (
416 SELECT m.member_account_set_id, m.account_set_id
417 FROM cala_account_set_member_account_sets m
418 JOIN cala_account_sets s
419 ON s.id = m.account_set_id
420 WHERE m.member_account_set_id = $1
421
422 UNION ALL
423 SELECT p.member_account_set_id, m.account_set_id
424 FROM parents p
425 JOIN cala_account_set_member_account_sets m
426 ON p.account_set_id = m.member_account_set_id
427 ),
428 non_transitive_insert AS (
429 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id)
430 VALUES ($1, $2)
431 ),
432 transitive_insert AS (
433 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
434 SELECT p.account_set_id, $2, TRUE
435 FROM parents p
436 )
437 SELECT account_set_id, NULL AS now
438 FROM parents
439 UNION ALL
440 SELECT NULL AS account_set_id, NOW() AS now
441 "#,
442 account_set_id as AccountSetId,
443 account_id as AccountId,
444 )
445 .fetch_all(db.as_executor())
446 .await?;
447 let mut time = None;
448 let ret = rows
449 .into_iter()
450 .filter_map(|row| {
451 if let Some(t) = row.now {
452 time = Some(t);
453 None
454 } else {
455 Some(AccountSetId::from(
456 row.account_set_id.expect("account_set_id not set"),
457 ))
458 }
459 })
460 .collect();
461
462 self.publisher
463 .publish_all(
464 db,
465 std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
466 account_set_id,
467 member_id: crate::account_set::AccountSetMemberId::Account(account_id),
468 }),
469 )
470 .await?;
471
472 Ok((time.expect("time not set"), ret))
473 }
474
475 pub async fn remove_member_account_and_return_parents(
476 &self,
477 db: &mut impl es_entity::AtomicOperation,
478 account_set_id: AccountSetId,
479 account_id: AccountId,
480 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
481 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
482 .execute(db.as_executor())
483 .await?;
484 let rows = sqlx::query!(
485 r#"
486 WITH RECURSIVE parents AS (
487 SELECT m.member_account_set_id, m.account_set_id
488 FROM cala_account_set_member_account_sets m
489 JOIN cala_account_sets s
490 ON s.id = m.account_set_id
491 WHERE m.member_account_set_id = $1
492
493 UNION ALL
494 SELECT p.member_account_set_id, m.account_set_id
495 FROM parents p
496 JOIN cala_account_set_member_account_sets m
497 ON p.account_set_id = m.member_account_set_id
498 ),
499 deletions as (
500 DELETE FROM cala_account_set_member_accounts
501 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
502 AND member_account_id = $2
503 )
504 SELECT account_set_id, NULL AS now
505 FROM parents
506 UNION ALL
507 SELECT NULL AS account_set_id, NOW() AS now
508 "#,
509 account_set_id as AccountSetId,
510 account_id as AccountId,
511 )
512 .fetch_all(db.as_executor())
513 .await?;
514 let mut time = None;
515 let ret = rows
516 .into_iter()
517 .filter_map(|row| {
518 if let Some(t) = row.now {
519 time = Some(t);
520 None
521 } else {
522 Some(AccountSetId::from(
523 row.account_set_id.expect("account_set_id not set"),
524 ))
525 }
526 })
527 .collect();
528
529 self.publisher
530 .publish_all(
531 db,
532 std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
533 account_set_id,
534 member_id: crate::account_set::AccountSetMemberId::Account(account_id),
535 }),
536 )
537 .await?;
538
539 Ok((time.expect("time not set"), ret))
540 }
541
542 pub async fn add_member_set_and_return_parents(
543 &self,
544 db: &mut impl es_entity::AtomicOperation,
545 account_set_id: AccountSetId,
546 member_account_set_id: AccountSetId,
547 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
548 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
549 .execute(db.as_executor())
550 .await?;
551 let rows = sqlx::query!(r#"
552 WITH RECURSIVE parents AS (
553 SELECT m.member_account_set_id, m.account_set_id
554 FROM cala_account_set_member_account_sets m
555 JOIN cala_account_sets s
556 ON s.id = m.account_set_id
557 WHERE m.member_account_set_id = $1
558
559 UNION ALL
560 SELECT p.member_account_set_id, m.account_set_id
561 FROM parents p
562 JOIN cala_account_set_member_account_sets m
563 ON p.account_set_id = m.member_account_set_id
564 ),
565 set_insert AS (
566 INSERT INTO cala_account_set_member_account_sets (account_set_id, member_account_set_id)
567 VALUES ($1, $2)
568 ),
569 new_members AS (
570 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
571 SELECT $1, m.member_account_id, TRUE
572 FROM cala_account_set_member_accounts m
573 WHERE m.account_set_id = $2
574 RETURNING member_account_id
575 ),
576 transitive_inserts AS (
577 INSERT INTO cala_account_set_member_accounts (account_set_id, member_account_id, transitive)
578 SELECT p.account_set_id, n.member_account_id, TRUE
579 FROM parents p
580 CROSS JOIN new_members n
581 )
582 SELECT account_set_id, NULL AS now
583 FROM parents
584 UNION ALL
585 SELECT NULL AS account_set_id, NOW() AS now
586 "#,
587 account_set_id as AccountSetId,
588 member_account_set_id as AccountSetId,
589 )
590 .fetch_all(db.as_executor())
591 .await?;
592 let mut time = None;
593 let ret = rows
594 .into_iter()
595 .filter_map(|row| {
596 if let Some(t) = row.now {
597 time = Some(t);
598 None
599 } else {
600 Some(AccountSetId::from(
601 row.account_set_id.expect("account_set_id not set"),
602 ))
603 }
604 })
605 .collect();
606
607 self.publisher
608 .publish_all(
609 db,
610 std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberCreated {
611 account_set_id,
612 member_id: crate::account_set::AccountSetMemberId::AccountSet(
613 member_account_set_id,
614 ),
615 }),
616 )
617 .await?;
618
619 Ok((time.expect("time not set"), ret))
620 }
621
622 pub async fn remove_member_set_and_return_parents(
623 &self,
624 db: &mut impl es_entity::AtomicOperation,
625 account_set_id: AccountSetId,
626 member_account_set_id: AccountSetId,
627 ) -> Result<(DateTime<Utc>, Vec<AccountSetId>), AccountSetError> {
628 sqlx::query!("SELECT pg_advisory_xact_lock($1)", ADDVISORY_LOCK_ID)
629 .execute(db.as_executor())
630 .await?;
631 let rows = sqlx::query!(
632 r#"
633 WITH RECURSIVE parents AS (
634 SELECT m.member_account_set_id, m.account_set_id
635 FROM cala_account_set_member_account_sets m
636 JOIN cala_account_sets s
637 ON s.id = m.account_set_id
638 WHERE m.member_account_set_id = $1
639
640 UNION ALL
641 SELECT p.member_account_set_id, m.account_set_id
642 FROM parents p
643 JOIN cala_account_set_member_account_sets m
644 ON p.account_set_id = m.member_account_set_id
645 ),
646 member_accounts_deletion AS (
647 DELETE FROM cala_account_set_member_accounts
648 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
649 AND member_account_id IN (SELECT member_account_id FROM cala_account_set_member_accounts
650 WHERE account_set_id = $2)
651 ),
652 member_account_set_deletion AS (
653 DELETE FROM cala_account_set_member_account_sets
654 WHERE account_set_id IN (SELECT account_set_id FROM parents UNION SELECT $1)
655 AND member_account_set_id = $2
656 )
657 SELECT account_set_id, NULL AS now
658 FROM parents
659 UNION ALL
660 SELECT NULL AS account_set_id, NOW() AS now
661 "#,
662 account_set_id as AccountSetId,
663 member_account_set_id as AccountSetId,
664 )
665 .fetch_all(db.as_executor())
666 .await?;
667 let mut time = None;
668 let ret = rows
669 .into_iter()
670 .filter_map(|row| {
671 if let Some(t) = row.now {
672 time = Some(t);
673 None
674 } else {
675 Some(AccountSetId::from(
676 row.account_set_id.expect("account_set_id not set"),
677 ))
678 }
679 })
680 .collect();
681
682 self.publisher
683 .publish_all(
684 db,
685 std::iter::once(crate::outbox::OutboxEventPayload::AccountSetMemberRemoved {
686 account_set_id,
687 member_id: crate::account_set::AccountSetMemberId::AccountSet(
688 member_account_set_id,
689 ),
690 }),
691 )
692 .await?;
693
694 Ok((time.expect("time not set"), ret))
695 }
696
697 pub async fn find_where_account_is_member(
698 &self,
699 account_id: AccountId,
700 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
701 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
702 {
703 self.find_where_account_is_member_in_op(&self.pool, account_id, query)
704 .await
705 }
706
707 pub async fn find_where_account_is_member_in_op(
708 &self,
709 op: impl es_entity::IntoOneTimeExecutor<'_>,
710 account_id: AccountId,
711 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
712 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
713 {
714 let (entities, has_next_page) = es_entity::es_query!(
715 tbl_prefix = "cala",
716 r#"SELECT a.id, a.name, a.created_at
717 FROM cala_account_sets a
718 JOIN cala_account_set_member_accounts asm
719 ON asm.account_set_id = a.id
720 WHERE asm.member_account_id = $1 AND transitive IS FALSE
721 AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
722 ORDER BY a.name, a.id
723 LIMIT $4"#,
724 account_id as AccountId,
725 query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
726 query.after.map(|c| c.name),
727 query.first as i64 + 1
728 )
729 .fetch_n(op, query.first)
730 .await?;
731
732 let mut end_cursor = None;
733 if let Some(last) = entities.last() {
734 end_cursor = Some(AccountSetsByNameCursor {
735 id: last.values().id,
736 name: last.values().name.clone(),
737 });
738 }
739 Ok(es_entity::PaginatedQueryRet {
740 entities,
741 has_next_page,
742 end_cursor,
743 })
744 }
745
746 pub async fn find_where_account_set_is_member(
747 &self,
748 account_set_id: AccountSetId,
749 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
750 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
751 {
752 self.find_where_account_set_is_member_in_op(&self.pool, account_set_id, query)
753 .await
754 }
755
756 pub async fn find_where_account_set_is_member_in_op(
757 &self,
758 op: impl es_entity::IntoOneTimeExecutor<'_>,
759 account_set_id: AccountSetId,
760 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
761 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
762 {
763 let (entities, has_next_page) = es_entity::es_query!(
764 tbl_prefix = "cala",
765 r#"SELECT a.id, a.name, a.created_at
766 FROM cala_account_sets a
767 JOIN cala_account_set_member_account_sets asm
768 ON asm.account_set_id = a.id
769 WHERE asm.member_account_set_id = $1
770 AND ((a.name, a.id) > ($3, $2) OR ($3 IS NULL AND $2 IS NULL))
771 ORDER BY a.name, a.id
772 LIMIT $4"#,
773 account_set_id as AccountSetId,
774 query.after.as_ref().map(|c| c.id) as Option<AccountSetId>,
775 query.after.map(|c| c.name),
776 query.first as i64 + 1
777 )
778 .fetch_n(op, query.first)
779 .await?;
780 let mut end_cursor = None;
781 if let Some(last) = entities.last() {
782 end_cursor = Some(AccountSetsByNameCursor {
783 id: last.values().id,
784 name: last.values().name.clone(),
785 });
786 }
787 Ok(es_entity::PaginatedQueryRet {
788 entities,
789 has_next_page,
790 end_cursor,
791 })
792 }
793
794 #[instrument(
795 name = "account_set.fetch_mappings_in_op",
796 skip_all,
797 err(level = "warn")
798 )]
799 pub async fn fetch_mappings_in_op(
800 &self,
801 op: impl es_entity::IntoOneTimeExecutor<'_>,
802 journal_id: JournalId,
803 account_ids: &[AccountId],
804 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
805 let rows = op.into_executor().fetch_all(sqlx::query!(
806 r#"
807 SELECT m.account_set_id AS "set_id!: AccountSetId", m.member_account_id AS "account_id!: AccountId"
808 FROM cala_account_set_member_accounts m
809 JOIN cala_account_sets s
810 ON m.account_set_id = s.id AND s.journal_id = $1
811 WHERE m.member_account_id = ANY($2)
812 "#,
813 journal_id as JournalId,
814 account_ids as &[AccountId]
815 ))
816 .await?;
817 let mut mappings = HashMap::new();
818 for row in rows {
819 mappings
820 .entry(row.account_id)
821 .or_insert_with(Vec::new)
822 .push(row.set_id);
823 }
824 Ok(mappings)
825 }
826
827 async fn publish(
828 &self,
829 op: &mut impl es_entity::AtomicOperation,
830 entity: &AccountSet,
831 new_events: es_entity::LastPersisted<'_, AccountSetEvent>,
832 ) -> Result<(), AccountSetError> {
833 self.publisher
834 .publish_entity_events(op, entity, new_events)
835 .await?;
836 Ok(())
837 }
838}