1#![cfg_attr(not(test), no_std)]
4#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
5#![cfg_attr(test, deny(clippy::assertions_on_result_states))]
6#![cfg_attr(
7 not(test),
8 deny(
9 missing_docs,
10 rust_2018_idioms,
11 clippy::string_to_string,
12 clippy::std_instead_of_core,
13 clippy::string_add,
14 clippy::str_to_string,
15 clippy::infinite_loop,
16 clippy::unwrap_used,
17 clippy::expect_used,
18 clippy::panic,
19 clippy::cfg_not_test,
20 clippy::as_conversions,
21 clippy::alloc_instead_of_core,
22 clippy::float_arithmetic,
23 clippy::empty_docs,
24 clippy::empty_line_after_doc_comments,
25 clippy::empty_line_after_outer_attr,
26 clippy::suspicious_doc_comments,
27 clippy::redundant_locals,
28 clippy::redundant_comparisons,
29 clippy::out_of_bounds_indexing,
30 clippy::empty_loop,
31 clippy::cast_sign_loss,
32 clippy::cast_possible_truncation,
33 clippy::cast_possible_wrap,
34 clippy::cast_lossless,
35 clippy::arithmetic_side_effects,
36 clippy::dbg_macro,
37 clippy::print_stdout,
38 clippy::print_stderr,
39 clippy::shadow_unrelated,
40 clippy::useless_attribute,
41 clippy::zero_repeat_side_effects,
42 clippy::builtin_type_shadow,
43 clippy::unreachable
44 )
45)]
46
47extern crate alloc;
48
49mod msg;
50mod state;
51#[cfg(test)]
52pub(crate) mod tests;
53
54use alloc::format;
55use alloc::vec::Vec;
56use core::fmt;
57use core::num::NonZeroU8;
58
59use either::*;
60use ibc_app_transfer_types::msgs::transfer::MsgTransfer;
61use ibc_app_transfer_types::packet::PacketData;
62use ibc_app_transfer_types::{Coin, PrefixedDenom, TracePrefix};
63use ibc_core_channel_types::acknowledgement::{
64 Acknowledgement, AcknowledgementStatus, StatusValue as AckStatusValue,
65};
66use ibc_core_channel_types::channel::{Counterparty, Order};
67use ibc_core_channel_types::error::ChannelError;
68use ibc_core_channel_types::packet::Packet;
69use ibc_core_channel_types::timeout::{TimeoutHeight, TimeoutTimestamp};
70use ibc_core_channel_types::Version;
71use ibc_core_host_types::identifiers::{ChannelId, ConnectionId, PortId, Sequence};
72use ibc_core_router::module::Module as IbcCoreModule;
73use ibc_core_router_types::event::{ModuleEvent, ModuleEventAttribute};
74use ibc_core_router_types::module::ModuleExtras;
75use ibc_middleware_module::MiddlewareModule;
76use ibc_middleware_module_macros::from_middleware;
77use ibc_primitives::prelude::*;
78use ibc_primitives::Signer;
79
80#[doc(inline)]
81pub use self::msg::{Duration, ForwardMetadata, PacketMetadata};
82#[doc(inline)]
83pub use self::state::{InFlightPacket, InFlightPacketKey};
84
85pub type RetryInFlightPacket = InFlightPacket;
87
88struct NewInFlightPacket<'pkt> {
89 src_packet: &'pkt Packet,
90 original_sender: Signer,
91 retries: Option<NonZeroU8>,
92 timeout: dur::Duration,
93}
94
95enum RetryOutcome {
96 GoAhead,
98 MaxRetriesExceeded,
100}
101
102#[derive(Debug)]
103enum MiddlewareError {
104 Message(String),
106 ForwardToNextMiddleware,
108}
109
110const MODULE: &str = "packet-forward-middleware";
112
113const DEFAULT_FORWARD_TIMEOUT: dur::Duration = {
115 const DURATION_IN_SECS: u128 = 5 * 60;
116 dur::Duration::from_secs(DURATION_IN_SECS)
117};
118
119const DEFAULT_FORWARD_RETRIES: NonZeroU8 = unsafe { NonZeroU8::new_unchecked(1) };
121
122pub trait PfmContext {
124 type Error: fmt::Display;
126
127 fn send_transfer_execute(&mut self, msg: MsgTransfer) -> Result<Sequence, Self::Error>;
130
131 fn receive_refund_execute(
134 &mut self,
135 packet_forwarded_by_pfm_to_next_hop: &Packet,
136 transfer_forwarded_by_pfm_to_next_hop: PacketData,
137 ) -> Result<(), Self::Error>;
138
139 fn send_refund_execute(
142 &mut self,
143 packet_from_previous_hop_sent_to_pfm: &InFlightPacket,
144 ) -> Result<(), Self::Error>;
145
146 fn write_ack_and_events(
148 &mut self,
149 packet: &Packet,
150 acknowledgement: &Acknowledgement,
151 ) -> Result<(), Self::Error>;
152
153 fn override_receiver(
160 &self,
161 channel: &ChannelId,
162 original_sender: &Signer,
163 ) -> Result<Signer, Self::Error>;
164
165 fn timeout_timestamp(
168 &self,
169 timeout_duration: dur::Duration,
170 ) -> Result<TimeoutTimestamp, Self::Error>;
171
172 fn store_inflight_packet(
175 &mut self,
176 key: InFlightPacketKey,
177 inflight_packet: InFlightPacket,
178 ) -> Result<(), Self::Error>;
179
180 fn retrieve_inflight_packet(
182 &self,
183 key: &InFlightPacketKey,
184 ) -> Result<Option<InFlightPacket>, Self::Error>;
185
186 fn delete_inflight_packet(&mut self, key: &InFlightPacketKey) -> Result<(), Self::Error>;
188
189 fn get_denom_for_this_chain(
192 &self,
193 this_chain_port: &PortId,
194 this_chain_chan: &ChannelId,
195 source_port: &PortId,
196 source_chan: &ChannelId,
197 source_denom: &PrefixedDenom,
198 ) -> Result<PrefixedDenom, Self::Error> {
199 let mut new_denom = source_denom.clone();
200
201 let source_chain_prefix = TracePrefix::new(source_port.clone(), source_chan.clone());
202
203 if source_denom.trace_path.starts_with(&source_chain_prefix) {
204 new_denom.trace_path.remove_prefix(&source_chain_prefix);
205 } else {
206 new_denom.trace_path.add_prefix(TracePrefix::new(
207 this_chain_port.clone(),
208 this_chain_chan.clone(),
209 ));
210 }
211
212 Ok(new_denom)
213 }
214}
215
216#[derive(Debug)]
219pub struct PacketForwardMiddleware<M> {
220 next: M,
221}
222
223impl<M> PacketForwardMiddleware<M> {
224 pub fn next(&self) -> &M {
226 &self.next
227 }
228
229 pub fn next_mut(&mut self) -> &mut M {
231 &mut self.next
232 }
233
234 pub const fn wrap(next: M) -> Self {
236 Self { next }
237 }
238}
239
240impl<M> PacketForwardMiddleware<M>
241where
242 M: IbcCoreModule + PfmContext,
243{
244 fn forward_transfer_packet(
245 &mut self,
246 extras: &mut ModuleExtras,
247 packet: Either<&Packet, RetryInFlightPacket>,
248 fwd_metadata: msg::ForwardMetadata,
249 original_sender: Signer,
250 override_receiver: Signer,
251 token_and_amount: Coin<PrefixedDenom>,
252 ) -> Result<(), MiddlewareError> {
253 let timeout = fwd_metadata
254 .timeout
255 .map_or(DEFAULT_FORWARD_TIMEOUT, |msg::Duration(d)| d);
256 let retries = get_retries_left_for_new_pkt(fwd_metadata.retries);
257
258 emit_event_with_attrs(extras, {
259 let mut attributes = Vec::with_capacity(8);
260
261 push_event_attr(
262 &mut attributes,
263 "is-retry".to_owned(),
264 packet.is_right().to_string(),
265 );
266 push_event_attr(
267 &mut attributes,
268 "escrow-account".to_owned(),
269 override_receiver.to_string(),
270 );
271 push_event_attr(
272 &mut attributes,
273 "sender".to_owned(),
274 original_sender.to_string(),
275 );
276 push_event_attr(
277 &mut attributes,
278 "receiver".to_owned(),
279 fwd_metadata.receiver.to_string(),
280 );
281 push_event_attr(
282 &mut attributes,
283 "port".to_owned(),
284 fwd_metadata.port.to_string(),
285 );
286 push_event_attr(
287 &mut attributes,
288 "channel".to_owned(),
289 fwd_metadata.channel.to_string(),
290 );
291
292 attributes
293 });
294
295 let next_memo = fwd_metadata
296 .next
297 .as_ref()
298 .map(|next| {
299 serde_json::to_string(next).map_err(|err| {
300 MiddlewareError::Message(format!("Failed to encode next memo: {err}"))
301 })
302 })
303 .transpose()?
304 .unwrap_or_default()
305 .into();
306
307 let fwd_msg_transfer = MsgTransfer {
308 port_id_on_a: fwd_metadata.port.clone(),
309 chan_id_on_a: fwd_metadata.channel.clone(),
310 timeout_height_on_b: TimeoutHeight::Never,
311 timeout_timestamp_on_b: self.next.timeout_timestamp(timeout).map_err(|err| {
312 MiddlewareError::Message(format!(
313 "Failed to get timeout timestamp for fwd msg transfer: {err}"
314 ))
315 })?,
316 packet_data: PacketData {
317 sender: override_receiver,
318 receiver: fwd_metadata.receiver,
319 token: token_and_amount,
320 memo: next_memo,
321 },
322 };
323
324 let sequence = self
325 .next
326 .send_transfer_execute(fwd_msg_transfer)
327 .map_err(|err| {
328 MiddlewareError::Message(format!("Failed to send forward packet: {err}"))
329 })?;
330
331 self.next
332 .store_inflight_packet(
333 InFlightPacketKey {
334 port: fwd_metadata.port,
335 channel: fwd_metadata.channel,
336 sequence,
337 },
338 next_inflight_packet(packet.map_left(|src_packet| NewInFlightPacket {
339 src_packet,
340 original_sender,
341 retries,
342 timeout,
343 })),
344 )
345 .map_err(|err| {
346 MiddlewareError::Message(format!("Failed to store in-flight packet: {err}"))
347 })?;
348
349 emit_event_with_attrs(
350 extras,
351 vec![event_attr(
352 "info".to_owned(),
353 "Packet has been successfully forwarded".to_owned(),
354 )],
355 );
356
357 Ok(())
358 }
359
360 fn receive_funds(
361 &mut self,
362 extras: &mut ModuleExtras,
363 source_packet: &Packet,
364 source_transfer_pkt: PacketData,
365 override_receiver: Signer,
366 relayer: &Signer,
367 ) -> Result<(), MiddlewareError> {
368 let override_packet = {
369 let ics20_packet_data = PacketData {
370 receiver: override_receiver,
371 memo: extract_next_memo_from_pfm_packet(&source_transfer_pkt).into(),
372 ..source_transfer_pkt
373 };
374
375 let encoded_packet_data = serde_json::to_vec(&ics20_packet_data).map_err(|err| {
376 MiddlewareError::Message(format!("Failed to encode ICS-20 packet: {err}"))
377 })?;
378
379 Packet {
380 data: encoded_packet_data,
381 ..source_packet.clone()
382 }
383 };
384
385 let maybe_ack = {
386 let (next_extras, maybe_ack) =
387 self.next.on_recv_packet_execute(&override_packet, relayer);
388 join_module_extras(extras, next_extras);
389 maybe_ack
390 };
391
392 let Some(ack) = maybe_ack else {
393 return Err(MiddlewareError::Message("Ack is nil".to_owned()));
394 };
395
396 let ack: AcknowledgementStatus = serde_json::from_slice(ack.as_bytes())
397 .map_err(|err| MiddlewareError::Message(format!("Failed to parse ack: {err}")))?;
398
399 if ack.is_successful() {
400 Ok(())
401 } else {
402 Err(MiddlewareError::Message(format!("Ack error: {ack}")))
403 }
404 }
405
406 fn write_acknowledgement_for_forwarded_packet(
407 &mut self,
408 packet: &Packet,
409 transfer_pkt: PacketData,
410 inflight_packet: InFlightPacket,
411 acknowledgement: &Acknowledgement,
412 ) -> Result<(), MiddlewareError> {
413 let ack: AcknowledgementStatus = serde_json::from_slice(acknowledgement.as_bytes())
414 .map_err(|err| MiddlewareError::Message(format!("Failed to parse ack: {err}")))?;
415
416 if !ack.is_successful() {
417 self.next
418 .receive_refund_execute(packet, transfer_pkt)
419 .map_err(|err| {
420 MiddlewareError::Message(format!(
421 "Failed to refund transfer sent to next hop: {err}"
422 ))
423 })?;
424
425 self.next
426 .send_refund_execute(&inflight_packet)
427 .map_err(|err| {
428 MiddlewareError::Message(format!(
429 "Failed to refund transfer received from previous hop: {err}"
430 ))
431 })?;
432 }
433
434 self.next
435 .write_ack_and_events(&inflight_packet.into(), acknowledgement)
436 .map_err(|err| {
437 MiddlewareError::Message(format!(
438 "Failed to write acknowledgement of in-flight packet: {err}"
439 ))
440 })?;
441
442 Ok(())
443 }
444
445 fn timeout_should_retry(
446 &self,
447 packet: &Packet,
448 ) -> Result<(RetryOutcome, InFlightPacket), MiddlewareError> {
449 let inflight_packet_key = InFlightPacketKey {
450 port: packet.port_id_on_a.clone(),
451 channel: packet.chan_id_on_a.clone(),
452 sequence: packet.seq_on_a,
453 };
454
455 let inflight_packet = self
456 .next
457 .retrieve_inflight_packet(&inflight_packet_key)
458 .map_err(|err| {
459 MiddlewareError::Message(format!(
460 "Failed to retrieve in-flight packet from storage: {err}"
461 ))
462 })?
463 .ok_or(MiddlewareError::ForwardToNextMiddleware)?;
464
465 let outcome = if inflight_packet.retries_remaining.is_some() {
466 RetryOutcome::GoAhead
467 } else {
468 RetryOutcome::MaxRetriesExceeded
469 };
470
471 Ok((outcome, inflight_packet))
472 }
473
474 fn retry_timeout(
475 &mut self,
476 extras: &mut ModuleExtras,
477 port: &PortId,
478 channel: &ChannelId,
479 transfer_pkt: PacketData,
480 inflight_packet: InFlightPacket,
481 ) -> Result<(), MiddlewareError> {
482 let next = {
483 let memo = transfer_pkt.memo.as_ref();
484
485 if !memo.is_empty() {
486 let json_obj_memo: serde_json::Map<String, serde_json::Value> =
487 serde_json::from_str(memo).map_err(|err| {
488 MiddlewareError::Message(format!("Failed to decode next memo: {err}"))
489 })?;
490 Some(json_obj_memo)
491 } else {
492 None
493 }
494 };
495 let fwd_metadata = msg::ForwardMetadata {
496 receiver: transfer_pkt.receiver,
497 port: port.clone(),
498 channel: channel.clone(),
499 timeout: Some(inflight_packet.timeout.clone()),
500 retries: {
501 debug_assert!(
502 inflight_packet.retries_remaining.is_some(),
503 "We should only hit this branch with at least one retry remaining"
504 );
505 inflight_packet.retries_remaining.map(NonZeroU8::get)
506 },
507 next,
508 };
509
510 let original_sender = inflight_packet.original_sender_address.clone();
511 let override_receiver = transfer_pkt.sender;
512 let token_and_amount = transfer_pkt.token;
513
514 self.forward_transfer_packet(
515 extras,
516 Right(inflight_packet),
517 fwd_metadata,
518 original_sender,
519 override_receiver,
520 token_and_amount,
521 )
522 }
523
524 fn on_recv_packet_execute_inner(
525 &mut self,
526 extras: &mut ModuleExtras,
527 packet: &Packet,
528 relayer: &Signer,
529 ) -> Result<(), MiddlewareError> {
530 let (transfer_pkt, fwd_metadata) = decode_forward_msg(packet)?;
531
532 let override_receiver =
533 get_receiver(&self.next, &packet.chan_id_on_b, &transfer_pkt.sender)?;
534 let denom_on_this_chain = self
535 .next
536 .get_denom_for_this_chain(
537 &packet.port_id_on_b,
538 &packet.chan_id_on_b,
539 &packet.port_id_on_a,
540 &packet.chan_id_on_a,
541 &transfer_pkt.token.denom,
542 )
543 .map_err(|err| {
544 MiddlewareError::Message(format!("Failed to get coin denom for this chain: {err}"))
545 })?;
546 let coin_on_this_chain = Coin {
547 denom: denom_on_this_chain,
548 amount: transfer_pkt.token.amount,
549 };
550 let original_sender = transfer_pkt.sender.clone();
551
552 self.receive_funds(
553 extras,
554 packet,
555 transfer_pkt,
556 override_receiver.clone(),
557 relayer,
558 )?;
559 self.forward_transfer_packet(
560 extras,
561 Left(packet),
562 fwd_metadata,
563 original_sender,
564 override_receiver,
565 coin_on_this_chain,
566 )?;
567
568 Ok(())
569 }
570
571 fn on_acknowledgement_packet_execute_inner(
572 &mut self,
573 extras: &mut ModuleExtras,
574 packet: &Packet,
575 acknowledgement: &Acknowledgement,
576 ) -> Result<(), MiddlewareError> {
577 let transfer_pkt = decode_ics20_msg(packet)?;
578
579 let inflight_packet_key = InFlightPacketKey {
580 port: packet.port_id_on_a.clone(),
581 channel: packet.chan_id_on_a.clone(),
582 sequence: packet.seq_on_a,
583 };
584
585 let inflight_packet = self
586 .next
587 .retrieve_inflight_packet(&inflight_packet_key)
588 .map_err(|err| {
589 MiddlewareError::Message(format!(
590 "Failed to retrieve in-flight packet from storage: {err}"
591 ))
592 })?
593 .ok_or(MiddlewareError::ForwardToNextMiddleware)?;
594
595 self.next
596 .delete_inflight_packet(&inflight_packet_key)
597 .map_err(|err| {
598 MiddlewareError::Message(format!(
599 "Failed to delete in-flight packet from storage: {err}"
600 ))
601 })?;
602
603 self.write_acknowledgement_for_forwarded_packet(
604 packet,
605 transfer_pkt,
606 inflight_packet,
607 acknowledgement,
608 )?;
609
610 emit_event_with_attrs(
611 extras,
612 vec![event_attr(
613 "info".to_owned(),
614 "Packet acknowledgement processed successfully".to_owned(),
615 )],
616 );
617
618 Ok(())
619 }
620
621 fn on_timeout_packet_execute_inner(
622 &mut self,
623 extras: &mut ModuleExtras,
624 packet: &Packet,
625 relayer: &Signer,
626 ) -> Result<(), MiddlewareError> {
627 let transfer_pkt = decode_ics20_msg(packet)?;
628 let should_retry = self.timeout_should_retry(packet)?;
629
630 let inflight_packet_key = InFlightPacketKey {
633 port: packet.port_id_on_a.clone(),
634 channel: packet.chan_id_on_a.clone(),
635 sequence: packet.seq_on_a,
636 };
637 self.next
638 .delete_inflight_packet(&inflight_packet_key)
639 .map_err(|err| {
640 MiddlewareError::Message(format!(
641 "Failed to delete in-flight packet from storage: {err}"
642 ))
643 })?;
644
645 match should_retry {
646 (RetryOutcome::GoAhead, inflight_packet) => {
647 let (next_extras, result) = self.next.on_timeout_packet_execute(packet, relayer);
648
649 join_module_extras(extras, next_extras);
650 result.map_err(|err| {
651 MiddlewareError::Message(format!(
652 "Failed to retry packet, while invoking \
653 on_timeout_packet_execute: {err}"
654 ))
655 })?;
656
657 self.retry_timeout(
658 extras,
659 &packet.port_id_on_a,
660 &packet.chan_id_on_a,
661 transfer_pkt,
662 inflight_packet,
663 )
664 }
665 (RetryOutcome::MaxRetriesExceeded, inflight_packet) => {
666 let acknowledgement = {
667 let InFlightPacket {
668 refund_sequence,
669 refund_port_id,
670 refund_channel_id,
671 ..
672 } = &inflight_packet;
673
674 new_error_ack(format!(
675 "In-flight packet max retries exceeded, for packet with sequence \
676 {refund_sequence} on {refund_port_id}/{refund_channel_id}"
677 ))
678 .into()
679 };
680
681 self.write_acknowledgement_for_forwarded_packet(
682 packet,
683 transfer_pkt,
684 inflight_packet,
685 &acknowledgement,
686 )
687 }
688 }
689 }
690}
691
692from_middleware! {
693 #[cfg_attr(coverage_nightly, coverage(off))]
694 impl<M> IbcCoreModule for PacketForwardMiddleware<M>
695 where
696 M: IbcCoreModule + PfmContext,
697}
698
699#[cfg_attr(coverage_nightly, coverage(off))]
700impl<M> MiddlewareModule for PacketForwardMiddleware<M>
701where
702 M: IbcCoreModule + PfmContext,
703{
704 type NextMiddleware = M;
705
706 #[inline]
707 fn next_middleware(&self) -> &M {
708 self.next()
709 }
710
711 #[inline]
712 fn next_middleware_mut(&mut self) -> &mut M {
713 self.next_mut()
714 }
715
716 fn middleware_on_recv_packet_execute(
717 &mut self,
718 packet: &Packet,
719 relayer: &Signer,
720 ) -> (ModuleExtras, Option<Acknowledgement>) {
721 let mut extras = ModuleExtras::empty();
722
723 match self.on_recv_packet_execute_inner(&mut extras, packet, relayer) {
724 Ok(()) => (extras, None),
725 Err(MiddlewareError::ForwardToNextMiddleware) => {
726 self.next.on_recv_packet_execute(packet, relayer)
727 }
728 Err(MiddlewareError::Message(err)) => (extras, Some(new_error_ack(err).into())),
729 }
730 }
731
732 fn middleware_on_acknowledgement_packet_execute(
733 &mut self,
734 packet: &Packet,
735 acknowledgement: &Acknowledgement,
736 relayer: &Signer,
737 ) -> (ModuleExtras, Result<(), ChannelError>) {
738 let mut extras = ModuleExtras::empty();
739
740 match self.on_acknowledgement_packet_execute_inner(&mut extras, packet, acknowledgement) {
741 Ok(()) => (extras, Ok(())),
742 Err(MiddlewareError::ForwardToNextMiddleware) => self
743 .next
744 .on_acknowledgement_packet_execute(packet, acknowledgement, relayer),
745 Err(MiddlewareError::Message(err)) => (extras, new_channel_error(err)),
746 }
747 }
748
749 fn middleware_on_timeout_packet_execute(
750 &mut self,
751 packet: &Packet,
752 relayer: &Signer,
753 ) -> (ModuleExtras, Result<(), ChannelError>) {
754 let mut extras = ModuleExtras::empty();
755
756 match self.on_timeout_packet_execute_inner(&mut extras, packet, relayer) {
757 Ok(()) => (extras, Ok(())),
758 Err(MiddlewareError::ForwardToNextMiddleware) => {
759 self.next.on_timeout_packet_execute(packet, relayer)
760 }
761 Err(MiddlewareError::Message(err)) => (extras, new_channel_error(err)),
762 }
763 }
764}
765
766#[inline]
767fn new_error_ack(message: impl fmt::Display) -> AcknowledgementStatus {
768 AcknowledgementStatus::error(
769 #[allow(clippy::expect_used)]
772 AckStatusValue::new(format!("{MODULE} error: {message}"))
773 .expect("Acknowledgement error must not be empty"),
774 )
775}
776
777#[inline]
778fn new_channel_error(message: impl fmt::Display) -> Result<(), ChannelError> {
779 Err(ChannelError::AppSpecific {
780 description: format!("{MODULE} error: {message}"),
781 })
782}
783
784fn get_receiver<C: PfmContext>(
785 ctx: &C,
786 channel: &ChannelId,
787 original_sender: &Signer,
788) -> Result<Signer, MiddlewareError> {
789 ctx.override_receiver(channel, original_sender)
790 .map_err(|err| MiddlewareError::Message(format!("Failed to get override receiver: {err}")))
791}
792
793fn decode_ics20_msg(packet: &Packet) -> Result<PacketData, MiddlewareError> {
794 serde_json::from_slice(&packet.data).map_err(|_| {
795 MiddlewareError::ForwardToNextMiddleware
798 })
799}
800
801fn decode_forward_msg(
802 packet: &Packet,
803) -> Result<(PacketData, msg::ForwardMetadata), MiddlewareError> {
804 let transfer_pkt = decode_ics20_msg(packet)?;
805
806 let json_obj_memo: serde_json::Map<String, serde_json::Value> =
807 serde_json::from_str(transfer_pkt.memo.as_ref()).map_err(|_| {
808 MiddlewareError::ForwardToNextMiddleware
811 })?;
812
813 if !json_obj_memo.contains_key("forward") {
814 return Err(MiddlewareError::ForwardToNextMiddleware);
817 }
818
819 serde_json::from_value(json_obj_memo.into()).map_or_else(
820 |err| Err(MiddlewareError::Message(err.to_string())),
821 |msg::PacketMetadata { forward }| Ok((transfer_pkt, forward)),
822 )
823}
824
825fn join_module_extras(first: &mut ModuleExtras, mut second: ModuleExtras) {
826 first.events.append(&mut second.events);
827 first.log.append(&mut second.log);
828}
829
830fn next_inflight_packet(
831 packet: Either<NewInFlightPacket<'_>, RetryInFlightPacket>,
832) -> InFlightPacket {
833 packet.either(
834 |NewInFlightPacket {
835 src_packet,
836 original_sender,
837 retries,
838 timeout,
839 }| InFlightPacket {
840 original_sender_address: original_sender,
841 refund_port_id: src_packet.port_id_on_b.clone(),
842 refund_channel_id: src_packet.chan_id_on_b.clone(),
843 packet_src_port_id: src_packet.port_id_on_a.clone(),
844 packet_src_channel_id: src_packet.chan_id_on_a.clone(),
845 packet_timeout_timestamp: src_packet.timeout_timestamp_on_b,
846 packet_timeout_height: src_packet.timeout_height_on_b,
847 packet_data: src_packet.data.clone(),
848 refund_sequence: src_packet.seq_on_a,
849 retries_remaining: retries,
850 timeout: msg::Duration::from_dur(timeout),
851 },
852 |inflight_packet| {
853 let retries_remaining = {
854 NonZeroU8::new(
855 #[allow(clippy::expect_used)]
857 inflight_packet
858 .retries_remaining
859 .expect("We should only hit this branch with at least one retry remaining")
860 .get()
861 .wrapping_sub(1),
862 )
863 };
864
865 RetryInFlightPacket {
866 retries_remaining,
867 ..inflight_packet
868 }
869 },
870 )
871}
872
873#[inline]
874fn event_attr<K, V>(key: K, value: V) -> ModuleEventAttribute
875where
876 K: Into<String>,
877 V: Into<String>,
878{
879 ModuleEventAttribute {
880 key: key.into(),
881 value: value.into(),
882 }
883}
884
885#[inline]
886fn push_event_attr<K, V>(attributes: &mut Vec<ModuleEventAttribute>, key: K, value: V)
887where
888 K: Into<String>,
889 V: Into<String>,
890{
891 attributes.push(event_attr(key, value));
892}
893
894#[inline]
895fn emit_event_with_attrs(extras: &mut ModuleExtras, attributes: Vec<ModuleEventAttribute>) {
896 extras.events.push(ModuleEvent {
897 kind: MODULE.to_owned(),
898 attributes,
899 });
900}
901
902#[inline]
903fn get_retries_left_for_new_pkt(input_retries: Option<u8>) -> Option<NonZeroU8> {
904 input_retries.map_or(Some(DEFAULT_FORWARD_RETRIES), NonZeroU8::new)
905}
906
907#[inline]
909fn extract_next_memo_from_pfm_packet(src_packet_data: &PacketData) -> String {
910 #[allow(clippy::unwrap_used, clippy::unreachable)]
911 let serde_json::Value::Object(mut memo_obj) =
912 serde_json::from_str(src_packet_data.memo.as_ref()).unwrap()
913 else {
914 unreachable!()
915 };
916 memo_obj.remove("forward");
917 #[allow(clippy::unwrap_used)]
918 serde_json::to_string(&memo_obj).unwrap()
919}