cala_server/graphql/
schema.rs

1use async_graphql::{dataloader::*, types::connection::*, *};
2use cala_ledger::{balance::AccountBalance, primitives::*, tx_template::NewParamDefinition};
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6use crate::{app::CalaApp, extension::*};
7
8use super::{
9    account::*, account_set::*, balance::*, journal::*, loader::*, primitives::*, transaction::*,
10    tx_template::*, velocity::*,
11};
12
13pub type DbOp<'a> = Arc<Mutex<cala_ledger::LedgerOperation<'a>>>;
14
15#[derive(Default)]
16pub struct CoreQuery<E: QueryExtensionMarker> {
17    _phantom: std::marker::PhantomData<E>,
18}
19
20#[Object(name = "Query")]
21impl<E: QueryExtensionMarker> CoreQuery<E> {
22    #[graphql(flatten)]
23    async fn extension(&self) -> E {
24        E::default()
25    }
26
27    async fn account(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Account>> {
28        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
29        Ok(loader.load_one(AccountId::from(id)).await?)
30    }
31
32    async fn account_by_external_id(
33        &self,
34        ctx: &Context<'_>,
35        external_id: String,
36    ) -> async_graphql::Result<Option<Account>> {
37        let app = ctx.data_unchecked::<CalaApp>();
38        match app
39            .ledger()
40            .accounts()
41            .find_by_external_id(external_id)
42            .await
43        {
44            Ok(account) => Ok(Some(account.into())),
45            Err(cala_ledger::account::error::AccountError::CouldNotFindByExternalId(_)) => Ok(None),
46            Err(err) => Err(err.into()),
47        }
48    }
49
50    async fn account_by_code(
51        &self,
52        ctx: &Context<'_>,
53        code: String,
54    ) -> async_graphql::Result<Option<Account>> {
55        let app = ctx.data_unchecked::<CalaApp>();
56        match app.ledger().accounts().find_by_code(code).await {
57            Ok(account) => Ok(Some(account.into())),
58            Err(cala_ledger::account::error::AccountError::CouldNotFindByCode(_)) => Ok(None),
59            Err(err) => Err(err.into()),
60        }
61    }
62
63    async fn accounts(
64        &self,
65        ctx: &Context<'_>,
66        first: i32,
67        after: Option<String>,
68    ) -> Result<Connection<AccountsByNameCursor, Account, EmptyFields, EmptyFields>> {
69        let app = ctx.data_unchecked::<CalaApp>();
70        query(
71            after,
72            None,
73            Some(first),
74            None,
75            |after, _, first, _| async move {
76                let first = first.expect("First always exists");
77                let result = app
78                    .ledger()
79                    .accounts()
80                    .list(cala_ledger::es_entity::PaginatedQueryArgs { first, after })
81                    .await?;
82                let mut connection = Connection::new(false, result.has_next_page);
83                connection
84                    .edges
85                    .extend(result.entities.into_iter().map(|entity| {
86                        let cursor = AccountsByNameCursor::from(&entity);
87                        Edge::new(cursor, Account::from(entity))
88                    }));
89                Ok::<_, async_graphql::Error>(connection)
90            },
91        )
92        .await
93    }
94
95    async fn account_set(
96        &self,
97        ctx: &Context<'_>,
98        id: UUID,
99    ) -> async_graphql::Result<Option<AccountSet>> {
100        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
101        Ok(loader.load_one(AccountSetId::from(id)).await?)
102    }
103
104    async fn journal(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Journal>> {
105        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
106        Ok(loader.load_one(JournalId::from(id)).await?)
107    }
108
109    async fn balance(
110        &self,
111        ctx: &Context<'_>,
112        journal_id: UUID,
113        account_id: UUID,
114        currency: CurrencyCode,
115    ) -> async_graphql::Result<Option<Balance>> {
116        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
117        let journal_id = JournalId::from(journal_id);
118        let account_id = AccountId::from(account_id);
119        let currency = Currency::from(currency);
120        let balance: Option<AccountBalance> =
121            loader.load_one((journal_id, account_id, currency)).await?;
122        Ok(balance.map(Balance::from))
123    }
124
125    async fn balance_in_range(
126        &self,
127        ctx: &Context<'_>,
128        journal_id: UUID,
129        account_id: UUID,
130        currency: CurrencyCode,
131        from: Timestamp,
132        until: Option<Timestamp>,
133    ) -> async_graphql::Result<Option<RangedBalance>> {
134        let app = ctx.data_unchecked::<CalaApp>();
135        match app
136            .ledger()
137            .balances()
138            .find_in_range(
139                JournalId::from(journal_id),
140                AccountId::from(account_id),
141                Currency::from(currency),
142                from.into_inner(),
143                until.map(|ts| ts.into_inner()),
144            )
145            .await
146        {
147            Ok(balance) => Ok(Some(balance.into())),
148            Err(cala_ledger::balance::error::BalanceError::NotFound(_, _, _)) => Ok(None),
149            Err(err) => Err(err.into()),
150        }
151    }
152
153    async fn transaction(
154        &self,
155        ctx: &Context<'_>,
156        id: UUID,
157    ) -> async_graphql::Result<Option<Transaction>> {
158        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
159        Ok(loader.load_one(TransactionId::from(id)).await?)
160    }
161
162    async fn transaction_by_external_id(
163        &self,
164        ctx: &Context<'_>,
165        external_id: String,
166    ) -> async_graphql::Result<Option<Transaction>> {
167        let app = ctx.data_unchecked::<CalaApp>();
168        match app
169            .ledger()
170            .transactions()
171            .find_by_external_id(external_id)
172            .await
173        {
174            Ok(transaction) => Ok(Some(transaction.into())),
175            Err(cala_ledger::transaction::error::TransactionError::CouldNotFindByExternalId(_)) => {
176                Ok(None)
177            }
178            Err(err) => Err(err.into()),
179        }
180    }
181
182    async fn tx_template(
183        &self,
184        ctx: &Context<'_>,
185        id: UUID,
186    ) -> async_graphql::Result<Option<TxTemplate>> {
187        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
188        Ok(loader.load_one(TxTemplateId::from(id)).await?)
189    }
190
191    async fn tx_template_by_code(
192        &self,
193        ctx: &Context<'_>,
194        code: String,
195    ) -> async_graphql::Result<Option<TxTemplate>> {
196        let app = ctx.data_unchecked::<CalaApp>();
197        match app.ledger().tx_templates().find_by_code(code).await {
198            Ok(tx_template) => Ok(Some(tx_template.into())),
199            Err(cala_ledger::tx_template::error::TxTemplateError::CouldNotFindByCode(_)) => {
200                Ok(None)
201            }
202            Err(err) => Err(err.into()),
203        }
204    }
205
206    async fn velocity_limit(
207        &self,
208        ctx: &Context<'_>,
209        id: UUID,
210    ) -> async_graphql::Result<Option<VelocityLimit>> {
211        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
212        Ok(loader.load_one(VelocityLimitId::from(id)).await?)
213    }
214
215    async fn velocity_control(
216        &self,
217        ctx: &Context<'_>,
218        id: UUID,
219    ) -> async_graphql::Result<Option<VelocityControl>> {
220        let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
221        Ok(loader.load_one(VelocityControlId::from(id)).await?)
222    }
223}
224
225#[derive(Default)]
226pub struct CoreMutation<E: MutationExtensionMarker> {
227    _phantom: std::marker::PhantomData<E>,
228}
229
230#[Object(name = "Mutation")]
231impl<E: MutationExtensionMarker> CoreMutation<E> {
232    #[graphql(flatten)]
233    async fn extension(&self) -> E {
234        E::default()
235    }
236
237    async fn account_create(
238        &self,
239        ctx: &Context<'_>,
240        input: AccountCreateInput,
241    ) -> Result<AccountCreatePayload> {
242        let app = ctx.data_unchecked::<CalaApp>();
243        let mut op = ctx
244            .data_unchecked::<DbOp>()
245            .try_lock()
246            .expect("Lock held concurrently");
247        let mut builder = cala_ledger::account::NewAccount::builder();
248        builder
249            .id(input.account_id)
250            .name(input.name)
251            .code(input.code)
252            .normal_balance_type(input.normal_balance_type)
253            .status(input.status);
254
255        if let Some(external_id) = input.external_id {
256            builder.external_id(external_id);
257        }
258        if let Some(description) = input.description {
259            builder.description(description);
260        }
261        if let Some(metadata) = input.metadata {
262            builder.metadata(metadata)?;
263        }
264        let account = app
265            .ledger()
266            .accounts()
267            .create_in_op(&mut op, builder.build()?)
268            .await?;
269
270        if let Some(account_set_ids) = input.account_set_ids {
271            for id in account_set_ids {
272                app.ledger()
273                    .account_sets()
274                    .add_member_in_op(&mut op, AccountSetId::from(id), account.id())
275                    .await?;
276            }
277        }
278
279        Ok(account.into())
280    }
281
282    async fn account_update(
283        &self,
284        ctx: &Context<'_>,
285        id: UUID,
286        input: AccountUpdateInput,
287    ) -> Result<AccountUpdatePayload> {
288        let app = ctx.data_unchecked::<CalaApp>();
289        let mut op = ctx
290            .data_unchecked::<DbOp>()
291            .try_lock()
292            .expect("Lock held concurrently");
293
294        let mut builder = cala_ledger::account::AccountUpdate::default();
295        if let Some(name) = input.name {
296            builder.name(name);
297        }
298        if let Some(code) = input.code {
299            builder.code(code);
300        }
301        if let Some(normal_balance_type) = input.normal_balance_type {
302            builder.normal_balance_type(normal_balance_type);
303        }
304        if let Some(status) = input.status {
305            builder.status(status);
306        }
307        if let Some(external_id) = input.external_id {
308            builder.external_id(external_id);
309        }
310        if let Some(description) = input.description {
311            builder.description(description);
312        }
313        if let Some(metadata) = input.metadata {
314            builder.metadata(metadata)?;
315        }
316
317        let mut account = app.ledger().accounts().find(AccountId::from(id)).await?;
318        account.update(builder);
319        app.ledger()
320            .accounts()
321            .persist_in_op(&mut op, &mut account)
322            .await?;
323
324        Ok(account.into())
325    }
326
327    async fn account_set_create(
328        &self,
329        ctx: &Context<'_>,
330        input: AccountSetCreateInput,
331    ) -> Result<AccountSetCreatePayload> {
332        let app = ctx.data_unchecked::<CalaApp>();
333        let mut op = ctx
334            .data_unchecked::<DbOp>()
335            .try_lock()
336            .expect("Lock held concurrently");
337        let mut builder = cala_ledger::account_set::NewAccountSet::builder();
338        builder
339            .id(input.account_set_id)
340            .journal_id(input.journal_id)
341            .name(input.name)
342            .normal_balance_type(input.normal_balance_type);
343
344        if let Some(description) = input.description {
345            builder.description(description);
346        }
347        if let Some(metadata) = input.metadata {
348            builder.metadata(metadata)?;
349        }
350        let account_set = app
351            .ledger()
352            .account_sets()
353            .create_in_op(&mut op, builder.build()?)
354            .await?;
355
356        Ok(account_set.into())
357    }
358
359    async fn account_set_update(
360        &self,
361        ctx: &Context<'_>,
362        id: UUID,
363        input: AccountSetUpdateInput,
364    ) -> Result<AccountSetUpdatePayload> {
365        let app = ctx.data_unchecked::<CalaApp>();
366        let mut op = ctx
367            .data_unchecked::<DbOp>()
368            .try_lock()
369            .expect("Lock held concurrently");
370        let mut builder = cala_ledger::account_set::AccountSetUpdate::default();
371        if let Some(name) = input.name {
372            builder.name(name);
373        }
374        if let Some(desc) = input.description {
375            builder.description(desc);
376        }
377        if let Some(normal_balance_type) = input.normal_balance_type {
378            builder.normal_balance_type(normal_balance_type);
379        }
380
381        if let Some(metadata) = input.metadata {
382            builder.metadata(metadata)?;
383        }
384
385        let mut account_set = app
386            .ledger()
387            .account_sets()
388            .find(AccountSetId::from(id))
389            .await?;
390        account_set.update(builder);
391
392        app.ledger()
393            .account_sets()
394            .persist_in_op(&mut op, &mut account_set)
395            .await?;
396
397        Ok(account_set.into())
398    }
399
400    async fn add_to_account_set(
401        &self,
402        ctx: &Context<'_>,
403        input: AddToAccountSetInput,
404    ) -> Result<AddToAccountSetPayload> {
405        let app = ctx.data_unchecked::<CalaApp>();
406        let mut op = ctx
407            .data_unchecked::<DbOp>()
408            .try_lock()
409            .expect("Lock held concurrently");
410
411        let account_set = app
412            .ledger()
413            .account_sets()
414            .add_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
415            .await?;
416
417        Ok(account_set.into())
418    }
419
420    async fn remove_from_account_set(
421        &self,
422        ctx: &Context<'_>,
423        input: RemoveFromAccountSetInput,
424    ) -> Result<RemoveFromAccountSetPayload> {
425        let app = ctx.data_unchecked::<CalaApp>();
426        let mut op = ctx
427            .data_unchecked::<DbOp>()
428            .try_lock()
429            .expect("Lock held concurrently");
430
431        let account_set = app
432            .ledger()
433            .account_sets()
434            .remove_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
435            .await?;
436
437        Ok(account_set.into())
438    }
439
440    async fn journal_create(
441        &self,
442        ctx: &Context<'_>,
443        input: JournalCreateInput,
444    ) -> Result<JournalCreatePayload> {
445        let app = ctx.data_unchecked::<CalaApp>();
446        let mut op = ctx
447            .data_unchecked::<DbOp>()
448            .try_lock()
449            .expect("Lock held concurrently");
450        let mut builder = cala_ledger::journal::NewJournal::builder();
451        builder
452            .id(input.journal_id)
453            .name(input.name)
454            .status(input.status);
455        if let Some(description) = input.description {
456            builder.description(description);
457        }
458        let journal = app
459            .ledger()
460            .journals()
461            .create_in_op(&mut op, builder.build()?)
462            .await?;
463
464        Ok(journal.into())
465    }
466
467    async fn journal_update(
468        &self,
469        ctx: &Context<'_>,
470        id: UUID,
471        input: JournalUpdateInput,
472    ) -> Result<JournalUpdatePayload> {
473        let app = ctx.data_unchecked::<CalaApp>();
474        let mut op = ctx
475            .data_unchecked::<DbOp>()
476            .try_lock()
477            .expect("Lock held concurrently");
478
479        let mut builder = cala_ledger::journal::JournalUpdate::default();
480        if let Some(name) = input.name {
481            builder.name(name);
482        }
483        if let Some(status) = input.status {
484            builder.status(status);
485        }
486        if let Some(description) = input.description {
487            builder.description(description);
488        }
489
490        let mut journal = app.ledger().journals().find(JournalId::from(id)).await?;
491        journal.update(builder);
492
493        app.ledger()
494            .journals()
495            .persist_in_op(&mut op, &mut journal)
496            .await?;
497
498        Ok(journal.into())
499    }
500
501    async fn tx_template_create(
502        &self,
503        ctx: &Context<'_>,
504        input: TxTemplateCreateInput,
505    ) -> Result<TxTemplateCreatePayload> {
506        let app = ctx.data_unchecked::<CalaApp>();
507        let mut op = ctx
508            .data_unchecked::<DbOp>()
509            .try_lock()
510            .expect("Lock held concurrently");
511        let mut new_tx_template_transaction_builder =
512            cala_ledger::tx_template::NewTxTemplateTransaction::builder();
513        let TxTemplateTransactionInput {
514            effective,
515            journal_id,
516            correlation_id,
517            external_id,
518            description,
519            metadata,
520        } = input.transaction;
521        new_tx_template_transaction_builder
522            .effective(effective)
523            .journal_id(journal_id);
524        if let Some(correlation_id) = correlation_id {
525            new_tx_template_transaction_builder.correlation_id(correlation_id);
526        };
527        if let Some(external_id) = external_id {
528            new_tx_template_transaction_builder.external_id(external_id);
529        };
530        if let Some(description) = description {
531            new_tx_template_transaction_builder.description(description);
532        };
533        if let Some(metadata) = metadata {
534            new_tx_template_transaction_builder.metadata(metadata);
535        }
536        let new_transaction = new_tx_template_transaction_builder.build()?;
537
538        let mut new_params = Vec::new();
539        if let Some(params) = input.params {
540            for param in params {
541                let mut param_builder = NewParamDefinition::builder();
542                param_builder.name(param.name).r#type(param.r#type.into());
543                if let Some(default) = param.default {
544                    param_builder.default_expr(default);
545                }
546                if let Some(desc) = param.description {
547                    param_builder.description(desc);
548                }
549                let new_param = param_builder.build()?;
550                new_params.push(new_param);
551            }
552        }
553
554        let mut new_entries = Vec::new();
555        for entry in input.entries {
556            let TxTemplateEntryInput {
557                entry_type,
558                account_id,
559                layer,
560                direction,
561                units,
562                currency,
563                description,
564            } = entry;
565            let mut new_entry_input_builder =
566                cala_ledger::tx_template::NewTxTemplateEntry::builder();
567            new_entry_input_builder
568                .entry_type(entry_type)
569                .account_id(account_id)
570                .layer(layer)
571                .direction(direction)
572                .units(units)
573                .currency(currency);
574            if let Some(desc) = description {
575                new_entry_input_builder.description(desc);
576            }
577            let new_entry_input = new_entry_input_builder.build()?;
578            new_entries.push(new_entry_input);
579        }
580
581        let mut new_tx_template_builder = cala_ledger::tx_template::NewTxTemplate::builder();
582        new_tx_template_builder
583            .id(input.tx_template_id)
584            .code(input.code)
585            .transaction(new_transaction)
586            .params(new_params)
587            .entries(new_entries);
588        if let Some(desc) = input.description {
589            new_tx_template_builder.description(desc);
590        }
591        if let Some(metadata) = input.metadata {
592            new_tx_template_builder.metadata(metadata)?;
593        }
594        let new_tx_template = new_tx_template_builder.build()?;
595
596        let tx_template = app
597            .ledger()
598            .tx_templates()
599            .create_in_op(&mut op, new_tx_template)
600            .await?;
601
602        Ok(tx_template.into())
603    }
604
605    async fn transaction_post(
606        &self,
607        ctx: &Context<'_>,
608        input: TransactionInput,
609    ) -> Result<TransactionPostPayload> {
610        let app = ctx.data_unchecked::<CalaApp>();
611        let mut op = ctx
612            .data_unchecked::<DbOp>()
613            .try_lock()
614            .expect("Lock held concurrently");
615        let params = input.params.map(cala_ledger::tx_template::Params::from);
616        let transaction = app
617            .ledger()
618            .post_transaction_in_op(
619                &mut op,
620                input.transaction_id.into(),
621                &input.tx_template_code,
622                params.unwrap_or_default(),
623            )
624            .await?;
625        Ok(transaction.into())
626    }
627
628    async fn velocity_limit_create(
629        &self,
630        ctx: &Context<'_>,
631        input: VelocityLimitCreateInput,
632    ) -> Result<VelocityLimitCreatePayload> {
633        let app = ctx.data_unchecked::<CalaApp>();
634        let mut op = ctx
635            .data_unchecked::<DbOp>()
636            .try_lock()
637            .expect("Lock held concurrently");
638
639        let mut new_velocity_limit_builder = cala_ledger::velocity::NewVelocityLimit::builder();
640        new_velocity_limit_builder
641            .id(input.velocity_limit_id)
642            .name(input.name)
643            .description(input.description);
644
645        if let Some(condition) = input.condition {
646            new_velocity_limit_builder.condition(condition);
647        }
648
649        if let Some(currency) = input.currency {
650            new_velocity_limit_builder.currency(currency);
651        }
652
653        let mut new_window = Vec::new();
654        for partition_key_input in input.window {
655            let mut new_partition_key_builder = cala_ledger::velocity::NewPartitionKey::builder();
656            let partition_key = new_partition_key_builder
657                .alias(partition_key_input.alias)
658                .value(partition_key_input.value)
659                .build()?;
660
661            new_window.push(partition_key);
662        }
663        new_velocity_limit_builder.window(new_window);
664
665        let mut new_limit_builder = cala_ledger::velocity::NewLimit::builder();
666
667        if let Some(timestamp_source) = input.limit.timestamp_source {
668            new_limit_builder.timestamp_source(timestamp_source);
669        }
670
671        let mut new_balance_limits = Vec::new();
672        for balance_limit_input in input.limit.balance {
673            let mut new_balance_limit_builder = cala_ledger::velocity::NewBalanceLimit::builder();
674            new_balance_limit_builder
675                .limit_type(balance_limit_input.limit_type)
676                .layer(balance_limit_input.layer)
677                .amount(balance_limit_input.amount)
678                .enforcement_direction(balance_limit_input.normal_balance_type);
679
680            if let Some(start) = balance_limit_input.start {
681                new_balance_limit_builder.start(start);
682            }
683
684            if let Some(end) = balance_limit_input.end {
685                new_balance_limit_builder.end(end);
686            }
687
688            let new_balance_limit = new_balance_limit_builder.build()?;
689            new_balance_limits.push(new_balance_limit);
690        }
691        new_limit_builder.balance(new_balance_limits);
692        let new_limit = new_limit_builder.build()?;
693
694        new_velocity_limit_builder.limit(new_limit);
695
696        if let Some(params) = input.params {
697            let mut new_params = Vec::new();
698            for param in params {
699                let mut param_builder = NewParamDefinition::builder();
700                param_builder.name(param.name).r#type(param.r#type.into());
701                if let Some(default) = param.default {
702                    param_builder.default_expr(default);
703                }
704                if let Some(description) = param.description {
705                    param_builder.description(description);
706                }
707                let new_param = param_builder.build()?;
708                new_params.push(new_param);
709            }
710            new_velocity_limit_builder.params(new_params);
711        }
712
713        let new_velocity_limit = new_velocity_limit_builder.build()?;
714
715        let velocity_limit = app
716            .ledger()
717            .velocities()
718            .create_limit_in_op(&mut op, new_velocity_limit)
719            .await?;
720
721        Ok(velocity_limit.into())
722    }
723
724    async fn velocity_control_create(
725        &self,
726        ctx: &Context<'_>,
727        input: VelocityControlCreateInput,
728    ) -> Result<VelocityControlCreatePayload> {
729        let app = ctx.data_unchecked::<CalaApp>();
730        let mut op = ctx
731            .data_unchecked::<DbOp>()
732            .try_lock()
733            .expect("Lock held concurrently");
734
735        let mut new_velocity_control_builder = cala_ledger::velocity::NewVelocityControl::builder();
736        new_velocity_control_builder
737            .id(input.velocity_control_id)
738            .name(input.name)
739            .description(input.description);
740
741        if let Some(condition) = input.condition {
742            new_velocity_control_builder.condition(condition);
743        }
744
745        let mut new_velocity_enforcement_builder =
746            cala_ledger::velocity::NewVelocityEnforcement::builder();
747        new_velocity_enforcement_builder.action(input.enforcement.velocity_enforcement_action);
748        let new_velocity_enforcement = new_velocity_enforcement_builder.build()?;
749
750        new_velocity_control_builder.enforcement(new_velocity_enforcement);
751
752        let new_velocity_control = new_velocity_control_builder.build()?;
753        let velocity_control = app
754            .ledger()
755            .velocities()
756            .create_control_in_op(&mut op, new_velocity_control)
757            .await?;
758
759        Ok(velocity_control.into())
760    }
761
762    async fn velocity_control_add_limit(
763        &self,
764        ctx: &Context<'_>,
765        input: VelocityControlAddLimitInput,
766    ) -> Result<VelocityControlAddLimitPayload> {
767        let app = ctx.data_unchecked::<CalaApp>();
768        let mut op = ctx
769            .data_unchecked::<DbOp>()
770            .try_lock()
771            .expect("Lock held concurrently");
772
773        let velocity_limit = app
774            .ledger()
775            .velocities()
776            .add_limit_to_control_in_op(
777                &mut op,
778                input.velocity_control_id.into(),
779                input.velocity_limit_id.into(),
780            )
781            .await?;
782
783        Ok(velocity_limit.into())
784    }
785
786    async fn velocity_control_attach(
787        &self,
788        ctx: &Context<'_>,
789        input: VelocityControlAttachInput,
790    ) -> Result<VelocityControlAttachPayload> {
791        let app = ctx.data_unchecked::<CalaApp>();
792        let mut op = ctx
793            .data_unchecked::<DbOp>()
794            .try_lock()
795            .expect("Lock held concurrently");
796        let params = cala_ledger::tx_template::Params::from(input.params);
797
798        let velocity_control = app
799            .ledger()
800            .velocities()
801            .attach_control_to_account_in_op(
802                &mut op,
803                input.velocity_control_id.into(),
804                input.account_id.into(),
805                params,
806            )
807            .await?;
808
809        Ok(velocity_control.into())
810    }
811}