1use async_graphql::{dataloader::*, types::connection::*, *};
2use cala_ledger::{balance::AccountBalance, primitives::*, tx_template::NewParamDefinition};
3use std::sync::Arc;
4use tokio::sync::Mutex;
5
6use crate::{app::CalaApp, extension::*};
7
8use super::{
9 account::*, account_set::*, balance::*, journal::*, loader::*, primitives::*, transaction::*,
10 tx_template::*, velocity::*,
11};
12
13pub type DbOp<'a> = Arc<Mutex<cala_ledger::LedgerOperation<'a>>>;
14
15#[derive(Default)]
16pub struct CoreQuery<E: QueryExtensionMarker> {
17 _phantom: std::marker::PhantomData<E>,
18}
19
20#[Object(name = "Query")]
21impl<E: QueryExtensionMarker> CoreQuery<E> {
22 #[graphql(flatten)]
23 async fn extension(&self) -> E {
24 E::default()
25 }
26
27 async fn account(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Account>> {
28 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
29 Ok(loader.load_one(AccountId::from(id)).await?)
30 }
31
32 async fn account_by_external_id(
33 &self,
34 ctx: &Context<'_>,
35 external_id: String,
36 ) -> async_graphql::Result<Option<Account>> {
37 let app = ctx.data_unchecked::<CalaApp>();
38 match app
39 .ledger()
40 .accounts()
41 .find_by_external_id(external_id)
42 .await
43 {
44 Ok(account) => Ok(Some(account.into())),
45 Err(cala_ledger::account::error::AccountError::CouldNotFindByExternalId(_)) => Ok(None),
46 Err(err) => Err(err.into()),
47 }
48 }
49
50 async fn account_by_code(
51 &self,
52 ctx: &Context<'_>,
53 code: String,
54 ) -> async_graphql::Result<Option<Account>> {
55 let app = ctx.data_unchecked::<CalaApp>();
56 match app.ledger().accounts().find_by_code(code).await {
57 Ok(account) => Ok(Some(account.into())),
58 Err(cala_ledger::account::error::AccountError::CouldNotFindByCode(_)) => Ok(None),
59 Err(err) => Err(err.into()),
60 }
61 }
62
63 async fn accounts(
64 &self,
65 ctx: &Context<'_>,
66 first: i32,
67 after: Option<String>,
68 ) -> Result<Connection<AccountsByNameCursor, Account, EmptyFields, EmptyFields>> {
69 let app = ctx.data_unchecked::<CalaApp>();
70 query(
71 after,
72 None,
73 Some(first),
74 None,
75 |after, _, first, _| async move {
76 let first = first.expect("First always exists");
77 let result = app
78 .ledger()
79 .accounts()
80 .list(cala_ledger::es_entity::PaginatedQueryArgs { first, after })
81 .await?;
82 let mut connection = Connection::new(false, result.has_next_page);
83 connection
84 .edges
85 .extend(result.entities.into_iter().map(|entity| {
86 let cursor = AccountsByNameCursor::from(&entity);
87 Edge::new(cursor, Account::from(entity))
88 }));
89 Ok::<_, async_graphql::Error>(connection)
90 },
91 )
92 .await
93 }
94
95 async fn account_set(
96 &self,
97 ctx: &Context<'_>,
98 id: UUID,
99 ) -> async_graphql::Result<Option<AccountSet>> {
100 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
101 Ok(loader.load_one(AccountSetId::from(id)).await?)
102 }
103
104 async fn journal(&self, ctx: &Context<'_>, id: UUID) -> async_graphql::Result<Option<Journal>> {
105 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
106 Ok(loader.load_one(JournalId::from(id)).await?)
107 }
108
109 async fn balance(
110 &self,
111 ctx: &Context<'_>,
112 journal_id: UUID,
113 account_id: UUID,
114 currency: CurrencyCode,
115 ) -> async_graphql::Result<Option<Balance>> {
116 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
117 let journal_id = JournalId::from(journal_id);
118 let account_id = AccountId::from(account_id);
119 let currency = Currency::from(currency);
120 let balance: Option<AccountBalance> =
121 loader.load_one((journal_id, account_id, currency)).await?;
122 Ok(balance.map(Balance::from))
123 }
124
125 async fn transaction(
126 &self,
127 ctx: &Context<'_>,
128 id: UUID,
129 ) -> async_graphql::Result<Option<Transaction>> {
130 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
131 Ok(loader.load_one(TransactionId::from(id)).await?)
132 }
133
134 async fn transaction_by_external_id(
135 &self,
136 ctx: &Context<'_>,
137 external_id: String,
138 ) -> async_graphql::Result<Option<Transaction>> {
139 let app = ctx.data_unchecked::<CalaApp>();
140 match app
141 .ledger()
142 .transactions()
143 .find_by_external_id(external_id)
144 .await
145 {
146 Ok(transaction) => Ok(Some(transaction.into())),
147 Err(cala_ledger::transaction::error::TransactionError::CouldNotFindByExternalId(_)) => {
148 Ok(None)
149 }
150 Err(err) => Err(err.into()),
151 }
152 }
153
154 async fn tx_template(
155 &self,
156 ctx: &Context<'_>,
157 id: UUID,
158 ) -> async_graphql::Result<Option<TxTemplate>> {
159 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
160 Ok(loader.load_one(TxTemplateId::from(id)).await?)
161 }
162
163 async fn tx_template_by_code(
164 &self,
165 ctx: &Context<'_>,
166 code: String,
167 ) -> async_graphql::Result<Option<TxTemplate>> {
168 let app = ctx.data_unchecked::<CalaApp>();
169 match app.ledger().tx_templates().find_by_code(code).await {
170 Ok(tx_template) => Ok(Some(tx_template.into())),
171 Err(cala_ledger::tx_template::error::TxTemplateError::CouldNotFindByCode(_)) => {
172 Ok(None)
173 }
174 Err(err) => Err(err.into()),
175 }
176 }
177
178 async fn velocity_limit(
179 &self,
180 ctx: &Context<'_>,
181 id: UUID,
182 ) -> async_graphql::Result<Option<VelocityLimit>> {
183 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
184 Ok(loader.load_one(VelocityLimitId::from(id)).await?)
185 }
186
187 async fn velocity_control(
188 &self,
189 ctx: &Context<'_>,
190 id: UUID,
191 ) -> async_graphql::Result<Option<VelocityControl>> {
192 let loader = ctx.data_unchecked::<DataLoader<LedgerDataLoader>>();
193 Ok(loader.load_one(VelocityControlId::from(id)).await?)
194 }
195}
196
197#[derive(Default)]
198pub struct CoreMutation<E: MutationExtensionMarker> {
199 _phantom: std::marker::PhantomData<E>,
200}
201
202#[Object(name = "Mutation")]
203impl<E: MutationExtensionMarker> CoreMutation<E> {
204 #[graphql(flatten)]
205 async fn extension(&self) -> E {
206 E::default()
207 }
208
209 async fn account_create(
210 &self,
211 ctx: &Context<'_>,
212 input: AccountCreateInput,
213 ) -> Result<AccountCreatePayload> {
214 let app = ctx.data_unchecked::<CalaApp>();
215 let mut op = ctx
216 .data_unchecked::<DbOp>()
217 .try_lock()
218 .expect("Lock held concurrently");
219 let mut builder = cala_ledger::account::NewAccount::builder();
220 builder
221 .id(input.account_id)
222 .name(input.name)
223 .code(input.code)
224 .normal_balance_type(input.normal_balance_type)
225 .status(input.status);
226
227 if let Some(external_id) = input.external_id {
228 builder.external_id(external_id);
229 }
230 if let Some(description) = input.description {
231 builder.description(description);
232 }
233 if let Some(metadata) = input.metadata {
234 builder.metadata(metadata)?;
235 }
236 let account = app
237 .ledger()
238 .accounts()
239 .create_in_op(&mut op, builder.build()?)
240 .await?;
241
242 if let Some(account_set_ids) = input.account_set_ids {
243 for id in account_set_ids {
244 app.ledger()
245 .account_sets()
246 .add_member_in_op(&mut op, AccountSetId::from(id), account.id())
247 .await?;
248 }
249 }
250
251 Ok(account.into())
252 }
253
254 async fn account_update(
255 &self,
256 ctx: &Context<'_>,
257 id: UUID,
258 input: AccountUpdateInput,
259 ) -> Result<AccountUpdatePayload> {
260 let app = ctx.data_unchecked::<CalaApp>();
261 let mut op = ctx
262 .data_unchecked::<DbOp>()
263 .try_lock()
264 .expect("Lock held concurrently");
265
266 let mut builder = cala_ledger::account::AccountUpdate::default();
267 if let Some(name) = input.name {
268 builder.name(name);
269 }
270 if let Some(code) = input.code {
271 builder.code(code);
272 }
273 if let Some(normal_balance_type) = input.normal_balance_type {
274 builder.normal_balance_type(normal_balance_type);
275 }
276 if let Some(status) = input.status {
277 builder.status(status);
278 }
279 if let Some(external_id) = input.external_id {
280 builder.external_id(external_id);
281 }
282 if let Some(description) = input.description {
283 builder.description(description);
284 }
285 if let Some(metadata) = input.metadata {
286 builder.metadata(metadata)?;
287 }
288
289 let mut account = app.ledger().accounts().find(AccountId::from(id)).await?;
290 account.update(builder);
291 app.ledger()
292 .accounts()
293 .persist_in_op(&mut op, &mut account)
294 .await?;
295
296 Ok(account.into())
297 }
298
299 async fn account_set_create(
300 &self,
301 ctx: &Context<'_>,
302 input: AccountSetCreateInput,
303 ) -> Result<AccountSetCreatePayload> {
304 let app = ctx.data_unchecked::<CalaApp>();
305 let mut op = ctx
306 .data_unchecked::<DbOp>()
307 .try_lock()
308 .expect("Lock held concurrently");
309 let mut builder = cala_ledger::account_set::NewAccountSet::builder();
310 builder
311 .id(input.account_set_id)
312 .journal_id(input.journal_id)
313 .name(input.name)
314 .normal_balance_type(input.normal_balance_type);
315
316 if let Some(description) = input.description {
317 builder.description(description);
318 }
319 if let Some(metadata) = input.metadata {
320 builder.metadata(metadata)?;
321 }
322 let account_set = app
323 .ledger()
324 .account_sets()
325 .create_in_op(&mut op, builder.build()?)
326 .await?;
327
328 Ok(account_set.into())
329 }
330
331 async fn account_set_update(
332 &self,
333 ctx: &Context<'_>,
334 id: UUID,
335 input: AccountSetUpdateInput,
336 ) -> Result<AccountSetUpdatePayload> {
337 let app = ctx.data_unchecked::<CalaApp>();
338 let mut op = ctx
339 .data_unchecked::<DbOp>()
340 .try_lock()
341 .expect("Lock held concurrently");
342 let mut builder = cala_ledger::account_set::AccountSetUpdate::default();
343 if let Some(name) = input.name {
344 builder.name(name);
345 }
346 if let Some(desc) = input.description {
347 builder.description(desc);
348 }
349 if let Some(normal_balance_type) = input.normal_balance_type {
350 builder.normal_balance_type(normal_balance_type);
351 }
352
353 if let Some(metadata) = input.metadata {
354 builder.metadata(metadata)?;
355 }
356
357 let mut account_set = app
358 .ledger()
359 .account_sets()
360 .find(AccountSetId::from(id))
361 .await?;
362 account_set.update(builder);
363
364 app.ledger()
365 .account_sets()
366 .persist_in_op(&mut op, &mut account_set)
367 .await?;
368
369 Ok(account_set.into())
370 }
371
372 async fn add_to_account_set(
373 &self,
374 ctx: &Context<'_>,
375 input: AddToAccountSetInput,
376 ) -> Result<AddToAccountSetPayload> {
377 let app = ctx.data_unchecked::<CalaApp>();
378 let mut op = ctx
379 .data_unchecked::<DbOp>()
380 .try_lock()
381 .expect("Lock held concurrently");
382
383 let account_set = app
384 .ledger()
385 .account_sets()
386 .add_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
387 .await?;
388
389 Ok(account_set.into())
390 }
391
392 async fn remove_from_account_set(
393 &self,
394 ctx: &Context<'_>,
395 input: RemoveFromAccountSetInput,
396 ) -> Result<RemoveFromAccountSetPayload> {
397 let app = ctx.data_unchecked::<CalaApp>();
398 let mut op = ctx
399 .data_unchecked::<DbOp>()
400 .try_lock()
401 .expect("Lock held concurrently");
402
403 let account_set = app
404 .ledger()
405 .account_sets()
406 .remove_member_in_op(&mut op, AccountSetId::from(input.account_set_id), input)
407 .await?;
408
409 Ok(account_set.into())
410 }
411
412 async fn journal_create(
413 &self,
414 ctx: &Context<'_>,
415 input: JournalCreateInput,
416 ) -> Result<JournalCreatePayload> {
417 let app = ctx.data_unchecked::<CalaApp>();
418 let mut op = ctx
419 .data_unchecked::<DbOp>()
420 .try_lock()
421 .expect("Lock held concurrently");
422 let mut builder = cala_ledger::journal::NewJournal::builder();
423 builder
424 .id(input.journal_id)
425 .name(input.name)
426 .status(input.status);
427 if let Some(description) = input.description {
428 builder.description(description);
429 }
430 let journal = app
431 .ledger()
432 .journals()
433 .create_in_op(&mut op, builder.build()?)
434 .await?;
435
436 Ok(journal.into())
437 }
438
439 async fn journal_update(
440 &self,
441 ctx: &Context<'_>,
442 id: UUID,
443 input: JournalUpdateInput,
444 ) -> Result<JournalUpdatePayload> {
445 let app = ctx.data_unchecked::<CalaApp>();
446 let mut op = ctx
447 .data_unchecked::<DbOp>()
448 .try_lock()
449 .expect("Lock held concurrently");
450
451 let mut builder = cala_ledger::journal::JournalUpdate::default();
452 if let Some(name) = input.name {
453 builder.name(name);
454 }
455 if let Some(status) = input.status {
456 builder.status(status);
457 }
458 if let Some(description) = input.description {
459 builder.description(description);
460 }
461
462 let mut journal = app.ledger().journals().find(JournalId::from(id)).await?;
463 journal.update(builder);
464
465 app.ledger()
466 .journals()
467 .persist_in_op(&mut op, &mut journal)
468 .await?;
469
470 Ok(journal.into())
471 }
472
473 async fn tx_template_create(
474 &self,
475 ctx: &Context<'_>,
476 input: TxTemplateCreateInput,
477 ) -> Result<TxTemplateCreatePayload> {
478 let app = ctx.data_unchecked::<CalaApp>();
479 let mut op = ctx
480 .data_unchecked::<DbOp>()
481 .try_lock()
482 .expect("Lock held concurrently");
483 let mut new_tx_template_transaction_builder =
484 cala_ledger::tx_template::NewTxTemplateTransaction::builder();
485 let TxTemplateTransactionInput {
486 effective,
487 journal_id,
488 correlation_id,
489 external_id,
490 description,
491 metadata,
492 } = input.transaction;
493 new_tx_template_transaction_builder
494 .effective(effective)
495 .journal_id(journal_id);
496 if let Some(correlation_id) = correlation_id {
497 new_tx_template_transaction_builder.correlation_id(correlation_id);
498 };
499 if let Some(external_id) = external_id {
500 new_tx_template_transaction_builder.external_id(external_id);
501 };
502 if let Some(description) = description {
503 new_tx_template_transaction_builder.description(description);
504 };
505 if let Some(metadata) = metadata {
506 new_tx_template_transaction_builder.metadata(metadata);
507 }
508 let new_transaction = new_tx_template_transaction_builder.build()?;
509
510 let mut new_params = Vec::new();
511 if let Some(params) = input.params {
512 for param in params {
513 let mut param_builder = NewParamDefinition::builder();
514 param_builder.name(param.name).r#type(param.r#type.into());
515 if let Some(default) = param.default {
516 param_builder.default_expr(default);
517 }
518 if let Some(desc) = param.description {
519 param_builder.description(desc);
520 }
521 let new_param = param_builder.build()?;
522 new_params.push(new_param);
523 }
524 }
525
526 let mut new_entries = Vec::new();
527 for entry in input.entries {
528 let TxTemplateEntryInput {
529 entry_type,
530 account_id,
531 layer,
532 direction,
533 units,
534 currency,
535 description,
536 } = entry;
537 let mut new_entry_input_builder =
538 cala_ledger::tx_template::NewTxTemplateEntry::builder();
539 new_entry_input_builder
540 .entry_type(entry_type)
541 .account_id(account_id)
542 .layer(layer)
543 .direction(direction)
544 .units(units)
545 .currency(currency);
546 if let Some(desc) = description {
547 new_entry_input_builder.description(desc);
548 }
549 let new_entry_input = new_entry_input_builder.build()?;
550 new_entries.push(new_entry_input);
551 }
552
553 let mut new_tx_template_builder = cala_ledger::tx_template::NewTxTemplate::builder();
554 new_tx_template_builder
555 .id(input.tx_template_id)
556 .code(input.code)
557 .transaction(new_transaction)
558 .params(new_params)
559 .entries(new_entries);
560 if let Some(desc) = input.description {
561 new_tx_template_builder.description(desc);
562 }
563 if let Some(metadata) = input.metadata {
564 new_tx_template_builder.metadata(metadata)?;
565 }
566 let new_tx_template = new_tx_template_builder.build()?;
567
568 let tx_template = app
569 .ledger()
570 .tx_templates()
571 .create_in_op(&mut op, new_tx_template)
572 .await?;
573
574 Ok(tx_template.into())
575 }
576
577 async fn transaction_post(
578 &self,
579 ctx: &Context<'_>,
580 input: TransactionInput,
581 ) -> Result<TransactionPostPayload> {
582 let app = ctx.data_unchecked::<CalaApp>();
583 let mut op = ctx
584 .data_unchecked::<DbOp>()
585 .try_lock()
586 .expect("Lock held concurrently");
587 let params = input.params.map(cala_ledger::tx_template::Params::from);
588 let transaction = app
589 .ledger()
590 .post_transaction_in_op(
591 &mut op,
592 input.transaction_id.into(),
593 &input.tx_template_code,
594 params.unwrap_or_default(),
595 )
596 .await?;
597 Ok(transaction.into())
598 }
599
600 async fn velocity_limit_create(
601 &self,
602 ctx: &Context<'_>,
603 input: VelocityLimitCreateInput,
604 ) -> Result<VelocityLimitCreatePayload> {
605 let app = ctx.data_unchecked::<CalaApp>();
606 let mut op = ctx
607 .data_unchecked::<DbOp>()
608 .try_lock()
609 .expect("Lock held concurrently");
610
611 let mut new_velocity_limit_builder = cala_ledger::velocity::NewVelocityLimit::builder();
612 new_velocity_limit_builder
613 .id(input.velocity_limit_id)
614 .name(input.name)
615 .description(input.description);
616
617 if let Some(condition) = input.condition {
618 new_velocity_limit_builder.condition(condition);
619 }
620
621 if let Some(currency) = input.currency {
622 new_velocity_limit_builder.currency(currency);
623 }
624
625 let mut new_window = Vec::new();
626 for partition_key_input in input.window {
627 let mut new_partition_key_builder = cala_ledger::velocity::NewPartitionKey::builder();
628 let partition_key = new_partition_key_builder
629 .alias(partition_key_input.alias)
630 .value(partition_key_input.value)
631 .build()?;
632
633 new_window.push(partition_key);
634 }
635 new_velocity_limit_builder.window(new_window);
636
637 let mut new_limit_builder = cala_ledger::velocity::NewLimit::builder();
638
639 if let Some(timestamp_source) = input.limit.timestamp_source {
640 new_limit_builder.timestamp_source(timestamp_source);
641 }
642
643 let mut new_balance_limits = Vec::new();
644 for balance_limit_input in input.limit.balance {
645 let mut new_balance_limit_builder = cala_ledger::velocity::NewBalanceLimit::builder();
646 new_balance_limit_builder
647 .limit_type(balance_limit_input.limit_type)
648 .layer(balance_limit_input.layer)
649 .amount(balance_limit_input.amount)
650 .enforcement_direction(balance_limit_input.normal_balance_type);
651
652 if let Some(start) = balance_limit_input.start {
653 new_balance_limit_builder.start(start);
654 }
655
656 if let Some(end) = balance_limit_input.end {
657 new_balance_limit_builder.end(end);
658 }
659
660 let new_balance_limit = new_balance_limit_builder.build()?;
661 new_balance_limits.push(new_balance_limit);
662 }
663 new_limit_builder.balance(new_balance_limits);
664 let new_limit = new_limit_builder.build()?;
665
666 new_velocity_limit_builder.limit(new_limit);
667
668 if let Some(params) = input.params {
669 let mut new_params = Vec::new();
670 for param in params {
671 let mut param_builder = NewParamDefinition::builder();
672 param_builder.name(param.name).r#type(param.r#type.into());
673 if let Some(default) = param.default {
674 param_builder.default_expr(default);
675 }
676 if let Some(description) = param.description {
677 param_builder.description(description);
678 }
679 let new_param = param_builder.build()?;
680 new_params.push(new_param);
681 }
682 new_velocity_limit_builder.params(new_params);
683 }
684
685 let new_velocity_limit = new_velocity_limit_builder.build()?;
686
687 let velocity_limit = app
688 .ledger()
689 .velocities()
690 .create_limit_in_op(&mut op, new_velocity_limit)
691 .await?;
692
693 Ok(velocity_limit.into())
694 }
695
696 async fn velocity_control_create(
697 &self,
698 ctx: &Context<'_>,
699 input: VelocityControlCreateInput,
700 ) -> Result<VelocityControlCreatePayload> {
701 let app = ctx.data_unchecked::<CalaApp>();
702 let mut op = ctx
703 .data_unchecked::<DbOp>()
704 .try_lock()
705 .expect("Lock held concurrently");
706
707 let mut new_velocity_control_builder = cala_ledger::velocity::NewVelocityControl::builder();
708 new_velocity_control_builder
709 .id(input.velocity_control_id)
710 .name(input.name)
711 .description(input.description);
712
713 if let Some(condition) = input.condition {
714 new_velocity_control_builder.condition(condition);
715 }
716
717 let mut new_velocity_enforcement_builder =
718 cala_ledger::velocity::NewVelocityEnforcement::builder();
719 new_velocity_enforcement_builder.action(input.enforcement.velocity_enforcement_action);
720 let new_velocity_enforcement = new_velocity_enforcement_builder.build()?;
721
722 new_velocity_control_builder.enforcement(new_velocity_enforcement);
723
724 let new_velocity_control = new_velocity_control_builder.build()?;
725 let velocity_control = app
726 .ledger()
727 .velocities()
728 .create_control_in_op(&mut op, new_velocity_control)
729 .await?;
730
731 Ok(velocity_control.into())
732 }
733
734 async fn velocity_control_add_limit(
735 &self,
736 ctx: &Context<'_>,
737 input: VelocityControlAddLimitInput,
738 ) -> Result<VelocityControlAddLimitPayload> {
739 let app = ctx.data_unchecked::<CalaApp>();
740 let mut op = ctx
741 .data_unchecked::<DbOp>()
742 .try_lock()
743 .expect("Lock held concurrently");
744
745 let velocity_limit = app
746 .ledger()
747 .velocities()
748 .add_limit_to_control_in_op(
749 &mut op,
750 input.velocity_control_id.into(),
751 input.velocity_limit_id.into(),
752 )
753 .await?;
754
755 Ok(velocity_limit.into())
756 }
757
758 async fn velocity_control_attach(
759 &self,
760 ctx: &Context<'_>,
761 input: VelocityControlAttachInput,
762 ) -> Result<VelocityControlAttachPayload> {
763 let app = ctx.data_unchecked::<CalaApp>();
764 let mut op = ctx
765 .data_unchecked::<DbOp>()
766 .try_lock()
767 .expect("Lock held concurrently");
768 let params = cala_ledger::tx_template::Params::from(input.params);
769
770 let velocity_control = app
771 .ledger()
772 .velocities()
773 .attach_control_to_account_in_op(
774 &mut op,
775 input.velocity_control_id.into(),
776 input.account_id.into(),
777 params,
778 )
779 .await?;
780
781 Ok(velocity_control.into())
782 }
783}