1use async_graphql::{dataloader::*, types::connection::*, *};
2use cala_ledger::{balance::AccountBalance, primitives::*, tx_template::NewParamDefinition};
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6use crate::{app::CalaApp, extension::*};
7
8use super::{
9 account::*, account_set::*, balance::*, journal::*, loader::*, primitives::*, transaction::*,
10 tx_template::*, velocity::*,
11};
12
13pub type DbOp<'a> = Arc<Mutex<cala_ledger::LedgerOperation<'a>>>;
14
15#[derive(Default)]
16pub struct CoreQuery<E: QueryExtensionMarker> {
17 _phantom: std::marker::PhantomData<E>,
18}
19
20#[Object(name = "Query")]
21impl<E: QueryExtensionMarker> CoreQuery<E> {
22 #[graphql(flatten)]
23 async fn extension(&self) -> E {
24 E::default()
25 }
26
27 async fn account(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Account>> {
28 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
29 Ok(loader.load_one(AccountId::from(id)).await?)
30 }
31
32 async fn account_by_external_id(
33 &self,
34 ctx: &Context<'_>,
35 external_id: String,
36 ) -> async_graphql::Result<Option<Account>> {
37 let app = ctx.data_unchecked::<CalaApp>();
38 match app
39 .ledger()
40 .accounts()
41 .find_by_external_id(external_id)
42 .await
43 {
44 Ok(account) => Ok(Some(account.into())),
45 Err(cala_ledger::account::error::AccountError::CouldNotFindByExternalId(_)) => Ok(None),
46 Err(err) => Err(err.into()),
47 }
48 }
49
50 async fn account_by_code(
51 &self,
52 ctx: &Context<'_>,
53 code: String,
54 ) -> async_graphql::Result<Option<Account>> {
55 let app = ctx.data_unchecked::<CalaApp>();
56 match app.ledger().accounts().find_by_code(code).await {
57 Ok(account) => Ok(Some(account.into())),
58 Err(cala_ledger::account::error::AccountError::CouldNotFindByCode(_)) => Ok(None),
59 Err(err) => Err(err.into()),
60 }
61 }
62
63 async fn accounts(
64 &self,
65 ctx: &Context<'_>,
66 first: i32,
67 after: Option<String>,
68 ) -> Result<Connection<AccountsByNameCursor, Account, EmptyFields, EmptyFields>> {
69 let app = ctx.data_unchecked::<CalaApp>();
70 query(
71 after,
72 None,
73 Some(first),
74 None,
75 |after, _, first, _| async move {
76 let first = first.expect("First always exists");
77 let result = app
78 .ledger()
79 .accounts()
80 .list(cala_ledger::es_entity::PaginatedQueryArgs { first, after })
81 .await?;
82 let mut connection = Connection::new(false, result.has_next_page);
83 connection
84 .edges
85 .extend(result.entities.into_iter().map(|entity| {
86 let cursor = AccountsByNameCursor::from(&entity);
87 Edge::new(cursor, Account::from(entity))
88 }));
89 Ok::<_, async_graphql::Error>(connection)
90 },
91 )
92 .await
93 }
94
95 async fn account_set(
96 &self,
97 ctx: &Context<'_>,
98 id: UUID,
99 ) -> async_graphql::Result<Option<AccountSet>> {
100 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
101 Ok(loader.load_one(AccountSetId::from(id)).await?)
102 }
103
104 async fn journal(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Journal>> {
105 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
106 Ok(loader.load_one(JournalId::from(id)).await?)
107 }
108
109 async fn balance(
110 &self,
111 ctx: &Context<'_>,
112 journal_id: UUID,
113 account_id: UUID,
114 currency: CurrencyCode,
115 ) -> async_graphql::Result<Option<Balance>> {
116 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
117 let journal_id = JournalId::from(journal_id);
118 let account_id = AccountId::from(account_id);
119 let currency = Currency::from(currency);
120 let balance: Option<AccountBalance> =
121 loader.load_one((journal_id, account_id, currency)).await?;
122 Ok(balance.map(Balance::from))
123 }
124
125 async fn balance_in_range(
126 &self,
127 ctx: &Context<'_>,
128 journal_id: UUID,
129 account_id: UUID,
130 currency: CurrencyCode,
131 from: Timestamp,
132 until: Option<Timestamp>,
133 ) -> async_graphql::Result<Option<RangedBalance>> {
134 let app = ctx.data_unchecked::<CalaApp>();
135 match app
136 .ledger()
137 .balances()
138 .find_in_range(
139 JournalId::from(journal_id),
140 AccountId::from(account_id),
141 Currency::from(currency),
142 from.into_inner(),
143 until.map(|ts| ts.into_inner()),
144 )
145 .await
146 {
147 Ok(balance) => Ok(Some(balance.into())),
148 Err(cala_ledger::balance::error::BalanceError::NotFound(_, _, _)) => Ok(None),
149 Err(err) => Err(err.into()),
150 }
151 }
152
153 async fn transaction(
154 &self,
155 ctx: &Context<'_>,
156 id: UUID,
157 ) -> async_graphql::Result<Option<Transaction>> {
158 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
159 Ok(loader.load_one(TransactionId::from(id)).await?)
160 }
161
162 async fn transaction_by_external_id(
163 &self,
164 ctx: &Context<'_>,
165 external_id: String,
166 ) -> async_graphql::Result<Option<Transaction>> {
167 let app = ctx.data_unchecked::<CalaApp>();
168 match app
169 .ledger()
170 .transactions()
171 .find_by_external_id(external_id)
172 .await
173 {
174 Ok(transaction) => Ok(Some(transaction.into())),
175 Err(cala_ledger::transaction::error::TransactionError::CouldNotFindByExternalId(_)) => {
176 Ok(None)
177 }
178 Err(err) => Err(err.into()),
179 }
180 }
181
182 async fn tx_template(
183 &self,
184 ctx: &Context<'_>,
185 id: UUID,
186 ) -> async_graphql::Result<Option<TxTemplate>> {
187 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
188 Ok(loader.load_one(TxTemplateId::from(id)).await?)
189 }
190
191 async fn tx_template_by_code(
192 &self,
193 ctx: &Context<'_>,
194 code: String,
195 ) -> async_graphql::Result<Option<TxTemplate>> {
196 let app = ctx.data_unchecked::<CalaApp>();
197 match app.ledger().tx_templates().find_by_code(code).await {
198 Ok(tx_template) => Ok(Some(tx_template.into())),
199 Err(cala_ledger::tx_template::error::TxTemplateError::CouldNotFindByCode(_)) => {
200 Ok(None)
201 }
202 Err(err) => Err(err.into()),
203 }
204 }
205
206 async fn velocity_limit(
207 &self,
208 ctx: &Context<'_>,
209 id: UUID,
210 ) -> async_graphql::Result<Option<VelocityLimit>> {
211 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
212 Ok(loader.load_one(VelocityLimitId::from(id)).await?)
213 }
214
215 async fn velocity_control(
216 &self,
217 ctx: &Context<'_>,
218 id: UUID,
219 ) -> async_graphql::Result<Option<VelocityControl>> {
220 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
221 Ok(loader.load_one(VelocityControlId::from(id)).await?)
222 }
223}
224
225#[derive(Default)]
226pub struct CoreMutation<E: MutationExtensionMarker> {
227 _phantom: std::marker::PhantomData<E>,
228}
229
230#[Object(name = "Mutation")]
231impl<E: MutationExtensionMarker> CoreMutation<E> {
232 #[graphql(flatten)]
233 async fn extension(&self) -> E {
234 E::default()
235 }
236
237 async fn account_create(
238 &self,
239 ctx: &Context<'_>,
240 input: AccountCreateInput,
241 ) -> Result<AccountCreatePayload> {
242 let app = ctx.data_unchecked::<CalaApp>();
243 let mut op = ctx
244 .data_unchecked::<DbOp>()
245 .try_lock()
246 .expect("Lock held concurrently");
247 let mut builder = cala_ledger::account::NewAccount::builder();
248 builder
249 .id(input.account_id)
250 .name(input.name)
251 .code(input.code)
252 .normal_balance_type(input.normal_balance_type)
253 .status(input.status);
254
255 if let Some(external_id) = input.external_id {
256 builder.external_id(external_id);
257 }
258 if let Some(description) = input.description {
259 builder.description(description);
260 }
261 if let Some(metadata) = input.metadata {
262 builder.metadata(metadata)?;
263 }
264 let account = app
265 .ledger()
266 .accounts()
267 .create_in_op(&mut op, builder.build()?)
268 .await?;
269
270 if let Some(account_set_ids) = input.account_set_ids {
271 for id in account_set_ids {
272 app.ledger()
273 .account_sets()
274 .add_member_in_op(&mut op, AccountSetId::from(id), account.id())
275 .await?;
276 }
277 }
278
279 Ok(account.into())
280 }
281
282 async fn account_update(
283 &self,
284 ctx: &Context<'_>,
285 id: UUID,
286 input: AccountUpdateInput,
287 ) -> Result<AccountUpdatePayload> {
288 let app = ctx.data_unchecked::<CalaApp>();
289 let mut op = ctx
290 .data_unchecked::<DbOp>()
291 .try_lock()
292 .expect("Lock held concurrently");
293
294 let mut builder = cala_ledger::account::AccountUpdate::default();
295 if let Some(name) = input.name {
296 builder.name(name);
297 }
298 if let Some(code) = input.code {
299 builder.code(code);
300 }
301 if let Some(normal_balance_type) = input.normal_balance_type {
302 builder.normal_balance_type(normal_balance_type);
303 }
304 if let Some(status) = input.status {
305 builder.status(status);
306 }
307 if let Some(external_id) = input.external_id {
308 builder.external_id(external_id);
309 }
310 if let Some(description) = input.description {
311 builder.description(description);
312 }
313 if let Some(metadata) = input.metadata {
314 builder.metadata(metadata)?;
315 }
316
317 let mut account = app.ledger().accounts().find(AccountId::from(id)).await?;
318 account.update(builder);
319 app.ledger()
320 .accounts()
321 .persist_in_op(&mut op, &mut account)
322 .await?;
323
324 Ok(account.into())
325 }
326
327 async fn account_set_create(
328 &self,
329 ctx: &Context<'_>,
330 input: AccountSetCreateInput,
331 ) -> Result<AccountSetCreatePayload> {
332 let app = ctx.data_unchecked::<CalaApp>();
333 let mut op = ctx
334 .data_unchecked::<DbOp>()
335 .try_lock()
336 .expect("Lock held concurrently");
337 let mut builder = cala_ledger::account_set::NewAccountSet::builder();
338 builder
339 .id(input.account_set_id)
340 .journal_id(input.journal_id)
341 .name(input.name)
342 .normal_balance_type(input.normal_balance_type);
343
344 if let Some(description) = input.description {
345 builder.description(description);
346 }
347 if let Some(metadata) = input.metadata {
348 builder.metadata(metadata)?;
349 }
350 let account_set = app
351 .ledger()
352 .account_sets()
353 .create_in_op(&mut op, builder.build()?)
354 .await?;
355
356 Ok(account_set.into())
357 }
358
359 async fn account_set_update(
360 &self,
361 ctx: &Context<'_>,
362 id: UUID,
363 input: AccountSetUpdateInput,
364 ) -> Result<AccountSetUpdatePayload> {
365 let app = ctx.data_unchecked::<CalaApp>();
366 let mut op = ctx
367 .data_unchecked::<DbOp>()
368 .try_lock()
369 .expect("Lock held concurrently");
370 let mut builder = cala_ledger::account_set::AccountSetUpdate::default();
371 if let Some(name) = input.name {
372 builder.name(name);
373 }
374 if let Some(desc) = input.description {
375 builder.description(desc);
376 }
377 if let Some(normal_balance_type) = input.normal_balance_type {
378 builder.normal_balance_type(normal_balance_type);
379 }
380
381 if let Some(metadata) = input.metadata {
382 builder.metadata(metadata)?;
383 }
384
385 let mut account_set = app
386 .ledger()
387 .account_sets()
388 .find(AccountSetId::from(id))
389 .await?;
390 account_set.update(builder);
391
392 app.ledger()
393 .account_sets()
394 .persist_in_op(&mut op, &mut account_set)
395 .await?;
396
397 Ok(account_set.into())
398 }
399
400 async fn add_to_account_set(
401 &self,
402 ctx: &Context<'_>,
403 input: AddToAccountSetInput,
404 ) -> Result<AddToAccountSetPayload> {
405 let app = ctx.data_unchecked::<CalaApp>();
406 let mut op = ctx
407 .data_unchecked::<DbOp>()
408 .try_lock()
409 .expect("Lock held concurrently");
410
411 let account_set = app
412 .ledger()
413 .account_sets()
414 .add_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
415 .await?;
416
417 Ok(account_set.into())
418 }
419
420 async fn remove_from_account_set(
421 &self,
422 ctx: &Context<'_>,
423 input: RemoveFromAccountSetInput,
424 ) -> Result<RemoveFromAccountSetPayload> {
425 let app = ctx.data_unchecked::<CalaApp>();
426 let mut op = ctx
427 .data_unchecked::<DbOp>()
428 .try_lock()
429 .expect("Lock held concurrently");
430
431 let account_set = app
432 .ledger()
433 .account_sets()
434 .remove_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
435 .await?;
436
437 Ok(account_set.into())
438 }
439
440 async fn journal_create(
441 &self,
442 ctx: &Context<'_>,
443 input: JournalCreateInput,
444 ) -> Result<JournalCreatePayload> {
445 let app = ctx.data_unchecked::<CalaApp>();
446 let mut op = ctx
447 .data_unchecked::<DbOp>()
448 .try_lock()
449 .expect("Lock held concurrently");
450 let mut builder = cala_ledger::journal::NewJournal::builder();
451 builder
452 .id(input.journal_id)
453 .name(input.name)
454 .status(input.status);
455 if let Some(description) = input.description {
456 builder.description(description);
457 }
458 let journal = app
459 .ledger()
460 .journals()
461 .create_in_op(&mut op, builder.build()?)
462 .await?;
463
464 Ok(journal.into())
465 }
466
467 async fn journal_update(
468 &self,
469 ctx: &Context<'_>,
470 id: UUID,
471 input: JournalUpdateInput,
472 ) -> Result<JournalUpdatePayload> {
473 let app = ctx.data_unchecked::<CalaApp>();
474 let mut op = ctx
475 .data_unchecked::<DbOp>()
476 .try_lock()
477 .expect("Lock held concurrently");
478
479 let mut builder = cala_ledger::journal::JournalUpdate::default();
480 if let Some(name) = input.name {
481 builder.name(name);
482 }
483 if let Some(status) = input.status {
484 builder.status(status);
485 }
486 if let Some(description) = input.description {
487 builder.description(description);
488 }
489
490 let mut journal = app.ledger().journals().find(JournalId::from(id)).await?;
491 journal.update(builder);
492
493 app.ledger()
494 .journals()
495 .persist_in_op(&mut op, &mut journal)
496 .await?;
497
498 Ok(journal.into())
499 }
500
501 async fn tx_template_create(
502 &self,
503 ctx: &Context<'_>,
504 input: TxTemplateCreateInput,
505 ) -> Result<TxTemplateCreatePayload> {
506 let app = ctx.data_unchecked::<CalaApp>();
507 let mut op = ctx
508 .data_unchecked::<DbOp>()
509 .try_lock()
510 .expect("Lock held concurrently");
511 let mut new_tx_template_transaction_builder =
512 cala_ledger::tx_template::NewTxTemplateTransaction::builder();
513 let TxTemplateTransactionInput {
514 effective,
515 journal_id,
516 correlation_id,
517 external_id,
518 description,
519 metadata,
520 } = input.transaction;
521 new_tx_template_transaction_builder
522 .effective(effective)
523 .journal_id(journal_id);
524 if let Some(correlation_id) = correlation_id {
525 new_tx_template_transaction_builder.correlation_id(correlation_id);
526 };
527 if let Some(external_id) = external_id {
528 new_tx_template_transaction_builder.external_id(external_id);
529 };
530 if let Some(description) = description {
531 new_tx_template_transaction_builder.description(description);
532 };
533 if let Some(metadata) = metadata {
534 new_tx_template_transaction_builder.metadata(metadata);
535 }
536 let new_transaction = new_tx_template_transaction_builder.build()?;
537
538 let mut new_params = Vec::new();
539 if let Some(params) = input.params {
540 for param in params {
541 let mut param_builder = NewParamDefinition::builder();
542 param_builder.name(param.name).r#type(param.r#type.into());
543 if let Some(default) = param.default {
544 param_builder.default_expr(default);
545 }
546 if let Some(desc) = param.description {
547 param_builder.description(desc);
548 }
549 let new_param = param_builder.build()?;
550 new_params.push(new_param);
551 }
552 }
553
554 let mut new_entries = Vec::new();
555 for entry in input.entries {
556 let TxTemplateEntryInput {
557 entry_type,
558 account_id,
559 layer,
560 direction,
561 units,
562 currency,
563 description,
564 } = entry;
565 let mut new_entry_input_builder =
566 cala_ledger::tx_template::NewTxTemplateEntry::builder();
567 new_entry_input_builder
568 .entry_type(entry_type)
569 .account_id(account_id)
570 .layer(layer)
571 .direction(direction)
572 .units(units)
573 .currency(currency);
574 if let Some(desc) = description {
575 new_entry_input_builder.description(desc);
576 }
577 let new_entry_input = new_entry_input_builder.build()?;
578 new_entries.push(new_entry_input);
579 }
580
581 let mut new_tx_template_builder = cala_ledger::tx_template::NewTxTemplate::builder();
582 new_tx_template_builder
583 .id(input.tx_template_id)
584 .code(input.code)
585 .transaction(new_transaction)
586 .params(new_params)
587 .entries(new_entries);
588 if let Some(desc) = input.description {
589 new_tx_template_builder.description(desc);
590 }
591 if let Some(metadata) = input.metadata {
592 new_tx_template_builder.metadata(metadata)?;
593 }
594 let new_tx_template = new_tx_template_builder.build()?;
595
596 let tx_template = app
597 .ledger()
598 .tx_templates()
599 .create_in_op(&mut op, new_tx_template)
600 .await?;
601
602 Ok(tx_template.into())
603 }
604
605 async fn transaction_post(
606 &self,
607 ctx: &Context<'_>,
608 input: TransactionInput,
609 ) -> Result<TransactionPostPayload> {
610 let app = ctx.data_unchecked::<CalaApp>();
611 let mut op = ctx
612 .data_unchecked::<DbOp>()
613 .try_lock()
614 .expect("Lock held concurrently");
615 let params = input.params.map(cala_ledger::tx_template::Params::from);
616 let transaction = app
617 .ledger()
618 .post_transaction_in_op(
619 &mut op,
620 input.transaction_id.into(),
621 &input.tx_template_code,
622 params.unwrap_or_default(),
623 )
624 .await?;
625 Ok(transaction.into())
626 }
627
628 async fn velocity_limit_create(
629 &self,
630 ctx: &Context<'_>,
631 input: VelocityLimitCreateInput,
632 ) -> Result<VelocityLimitCreatePayload> {
633 let app = ctx.data_unchecked::<CalaApp>();
634 let mut op = ctx
635 .data_unchecked::<DbOp>()
636 .try_lock()
637 .expect("Lock held concurrently");
638
639 let mut new_velocity_limit_builder = cala_ledger::velocity::NewVelocityLimit::builder();
640 new_velocity_limit_builder
641 .id(input.velocity_limit_id)
642 .name(input.name)
643 .description(input.description);
644
645 if let Some(condition) = input.condition {
646 new_velocity_limit_builder.condition(condition);
647 }
648
649 if let Some(currency) = input.currency {
650 new_velocity_limit_builder.currency(currency);
651 }
652
653 let mut new_window = Vec::new();
654 for partition_key_input in input.window {
655 let mut new_partition_key_builder = cala_ledger::velocity::NewPartitionKey::builder();
656 let partition_key = new_partition_key_builder
657 .alias(partition_key_input.alias)
658 .value(partition_key_input.value)
659 .build()?;
660
661 new_window.push(partition_key);
662 }
663 new_velocity_limit_builder.window(new_window);
664
665 let mut new_limit_builder = cala_ledger::velocity::NewLimit::builder();
666
667 if let Some(timestamp_source) = input.limit.timestamp_source {
668 new_limit_builder.timestamp_source(timestamp_source);
669 }
670
671 let mut new_balance_limits = Vec::new();
672 for balance_limit_input in input.limit.balance {
673 let mut new_balance_limit_builder = cala_ledger::velocity::NewBalanceLimit::builder();
674 new_balance_limit_builder
675 .limit_type(balance_limit_input.limit_type)
676 .layer(balance_limit_input.layer)
677 .amount(balance_limit_input.amount)
678 .enforcement_direction(balance_limit_input.normal_balance_type);
679
680 if let Some(start) = balance_limit_input.start {
681 new_balance_limit_builder.start(start);
682 }
683
684 if let Some(end) = balance_limit_input.end {
685 new_balance_limit_builder.end(end);
686 }
687
688 let new_balance_limit = new_balance_limit_builder.build()?;
689 new_balance_limits.push(new_balance_limit);
690 }
691 new_limit_builder.balance(new_balance_limits);
692 let new_limit = new_limit_builder.build()?;
693
694 new_velocity_limit_builder.limit(new_limit);
695
696 if let Some(params) = input.params {
697 let mut new_params = Vec::new();
698 for param in params {
699 let mut param_builder = NewParamDefinition::builder();
700 param_builder.name(param.name).r#type(param.r#type.into());
701 if let Some(default) = param.default {
702 param_builder.default_expr(default);
703 }
704 if let Some(description) = param.description {
705 param_builder.description(description);
706 }
707 let new_param = param_builder.build()?;
708 new_params.push(new_param);
709 }
710 new_velocity_limit_builder.params(new_params);
711 }
712
713 let new_velocity_limit = new_velocity_limit_builder.build()?;
714
715 let velocity_limit = app
716 .ledger()
717 .velocities()
718 .create_limit_in_op(&mut op, new_velocity_limit)
719 .await?;
720
721 Ok(velocity_limit.into())
722 }
723
724 async fn velocity_control_create(
725 &self,
726 ctx: &Context<'_>,
727 input: VelocityControlCreateInput,
728 ) -> Result<VelocityControlCreatePayload> {
729 let app = ctx.data_unchecked::<CalaApp>();
730 let mut op = ctx
731 .data_unchecked::<DbOp>()
732 .try_lock()
733 .expect("Lock held concurrently");
734
735 let mut new_velocity_control_builder = cala_ledger::velocity::NewVelocityControl::builder();
736 new_velocity_control_builder
737 .id(input.velocity_control_id)
738 .name(input.name)
739 .description(input.description);
740
741 if let Some(condition) = input.condition {
742 new_velocity_control_builder.condition(condition);
743 }
744
745 let mut new_velocity_enforcement_builder =
746 cala_ledger::velocity::NewVelocityEnforcement::builder();
747 new_velocity_enforcement_builder.action(input.enforcement.velocity_enforcement_action);
748 let new_velocity_enforcement = new_velocity_enforcement_builder.build()?;
749
750 new_velocity_control_builder.enforcement(new_velocity_enforcement);
751
752 let new_velocity_control = new_velocity_control_builder.build()?;
753 let velocity_control = app
754 .ledger()
755 .velocities()
756 .create_control_in_op(&mut op, new_velocity_control)
757 .await?;
758
759 Ok(velocity_control.into())
760 }
761
762 async fn velocity_control_add_limit(
763 &self,
764 ctx: &Context<'_>,
765 input: VelocityControlAddLimitInput,
766 ) -> Result<VelocityControlAddLimitPayload> {
767 let app = ctx.data_unchecked::<CalaApp>();
768 let mut op = ctx
769 .data_unchecked::<DbOp>()
770 .try_lock()
771 .expect("Lock held concurrently");
772
773 let velocity_limit = app
774 .ledger()
775 .velocities()
776 .add_limit_to_control_in_op(
777 &mut op,
778 input.velocity_control_id.into(),
779 input.velocity_limit_id.into(),
780 )
781 .await?;
782
783 Ok(velocity_limit.into())
784 }
785
786 async fn velocity_control_attach(
787 &self,
788 ctx: &Context<'_>,
789 input: VelocityControlAttachInput,
790 ) -> Result<VelocityControlAttachPayload> {
791 let app = ctx.data_unchecked::<CalaApp>();
792 let mut op = ctx
793 .data_unchecked::<DbOp>()
794 .try_lock()
795 .expect("Lock held concurrently");
796 let params = cala_ledger::tx_template::Params::from(input.params);
797
798 let velocity_control = app
799 .ledger()
800 .velocities()
801 .attach_control_to_account_in_op(
802 &mut op,
803 input.velocity_control_id.into(),
804 input.account_id.into(),
805 params,
806 )
807 .await?;
808
809 Ok(velocity_control.into())
810 }
811}