Skip to main content

arc_malachitebft_engine/
sync.rs

1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::ops::RangeInclusive;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use bytesize::ByteSize;
9use derive_where::derive_where;
10use eyre::eyre;
11use ractor::{Actor, ActorProcessingErr, ActorRef};
12use rand::SeedableRng;
13use tokio::task::JoinHandle;
14use tracing::{debug, error, info, warn, Instrument};
15
16use malachitebft_codec as codec;
17use malachitebft_core_consensus::util::bounded_queue::BoundedQueue;
18use malachitebft_core_consensus::PeerId;
19use malachitebft_core_types::utils::height::DisplayRange;
20use malachitebft_core_types::ValueResponse as CoreValueResponse;
21use malachitebft_core_types::{CommitCertificate, Context};
22use malachitebft_sync::{
23    self as sync, HeightStartType, InboundRequestId, OutboundRequestId, RawDecidedValue, Request,
24    Response, Resumable,
25};
26
27use crate::consensus::{ConsensusMsg, ConsensusRef};
28use crate::host::{HostMsg, HostRef};
29use crate::network::{NetworkEvent, NetworkMsg, NetworkRef, Status};
30use crate::util::ticker::ticker;
31use crate::util::timers::{TimeoutElapsed, TimerScheduler};
32
33/// Codec for sync protocol messages
34///
35/// This trait is automatically implemented for any type that implements:
36/// - [`codec::Codec<sync::Status<Ctx>>`]
37/// - [`codec::Codec<sync::Request<Ctx>>`]
38/// - [`codec::Codec<sync::Response<Ctx>>`]
39pub trait SyncCodec<Ctx>
40where
41    Ctx: Context,
42    Self: codec::Codec<sync::Status<Ctx>>,
43    Self: codec::Codec<sync::Request<Ctx>>,
44    Self: codec::Codec<sync::Response<Ctx>>,
45    Self: codec::HasEncodedLen<sync::Response<Ctx>>,
46{
47}
48
49impl<Ctx, Codec> SyncCodec<Ctx> for Codec
50where
51    Ctx: Context,
52    Codec: codec::Codec<sync::Status<Ctx>>,
53    Codec: codec::Codec<sync::Request<Ctx>>,
54    Codec: codec::Codec<sync::Response<Ctx>>,
55    Codec: codec::HasEncodedLen<sync::Response<Ctx>>,
56{
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Hash)]
60pub enum Timeout {
61    Request(OutboundRequestId),
62}
63
64type Timers = TimerScheduler<Timeout>;
65
66pub type SyncRef<Ctx> = ActorRef<Msg<Ctx>>;
67pub type SyncMsg<Ctx> = Msg<Ctx>;
68
69#[derive_where(Clone, Debug)]
70pub struct RawDecidedBlock<Ctx: Context> {
71    pub height: Ctx::Height,
72    pub certificate: CommitCertificate<Ctx>,
73    pub value_bytes: Bytes,
74}
75
76#[derive_where(Clone, Debug)]
77pub struct InflightRequest<Ctx: Context> {
78    pub peer_id: PeerId,
79    pub request_id: OutboundRequestId,
80    pub request: Request<Ctx>,
81}
82
83pub type InflightRequests<Ctx> = HashMap<OutboundRequestId, InflightRequest<Ctx>>;
84
85#[derive_where(Clone, Debug)]
86pub enum Msg<Ctx: Context> {
87    /// Internal tick
88    Tick,
89
90    /// Receive an even from gossip layer
91    NetworkEvent(NetworkEvent<Ctx>),
92
93    /// Consensus has decided on a value at the given height
94    Decided(Ctx::Height),
95
96    /// Consensus has (re)started a new height.
97    ///
98    /// The second argument indicates whether this is a restart or not.
99    StartedHeight(Ctx::Height, HeightStartType),
100
101    /// Host has a response for the blocks request
102    GotDecidedValues(
103        InboundRequestId,
104        RangeInclusive<Ctx::Height>,
105        Vec<RawDecidedValue<Ctx>>,
106    ),
107
108    /// A timeout has elapsed
109    TimeoutElapsed(TimeoutElapsed<Timeout>),
110
111    /// We received an invalid value (either certificate or value) from a peer
112    InvalidValue(PeerId, Ctx::Height),
113
114    /// An error occurred while processing a value
115    ValueProcessingError(PeerId, Ctx::Height),
116}
117
118impl<Ctx: Context> From<NetworkEvent<Ctx>> for Msg<Ctx> {
119    fn from(event: NetworkEvent<Ctx>) -> Self {
120        Msg::NetworkEvent(event)
121    }
122}
123
124impl<Ctx: Context> From<TimeoutElapsed<Timeout>> for Msg<Ctx> {
125    fn from(elapsed: TimeoutElapsed<Timeout>) -> Self {
126        Msg::TimeoutElapsed(elapsed)
127    }
128}
129
130#[derive(Debug)]
131pub struct Params {
132    /// Interval at which to update other peers of our status
133    /// If set to 0s, status updates are sent eagerly right after each decision.
134    /// Default: 5s
135    pub status_update_interval: Duration,
136
137    /// Timeout duration for sync requests
138    /// Default: 10s
139    pub request_timeout: Duration,
140}
141
142impl Default for Params {
143    fn default() -> Self {
144        Self {
145            status_update_interval: Duration::from_secs(5),
146            request_timeout: Duration::from_secs(10),
147        }
148    }
149}
150
151/// A sync value buffered in the queue, tagged with the request that produced it.
152#[derive_where(Clone, Debug)]
153struct BufferedValue<Ctx: Context> {
154    request_id: OutboundRequestId,
155    value: CoreValueResponse<Ctx>,
156}
157
158impl<Ctx: Context> BufferedValue<Ctx> {
159    fn new(request_id: OutboundRequestId, value: CoreValueResponse<Ctx>) -> Self {
160        Self { request_id, value }
161    }
162}
163
164/// A queue of buffered sync values for heights ahead of consensus, keyed by height.
165type SyncQueue<Ctx> = BoundedQueue<<Ctx as Context>::Height, BufferedValue<Ctx>>;
166
167/// The mode for sending status updates
168enum StatusUpdateMode {
169    /// Send status updates at regular intervals
170    Interval(JoinHandle<()>), // the ticker task handle
171
172    /// Send status updates with tip height when starting a new height
173    OnStartedHeight,
174}
175
176pub struct State<Ctx: Context> {
177    /// The state of the sync state machine
178    sync: sync::State<Ctx>,
179
180    /// Scheduler for timers
181    timers: Timers,
182
183    /// In-flight requests
184    inflight: InflightRequests<Ctx>,
185
186    /// Queue of sync value responses for heights ahead of consensus
187    sync_queue: SyncQueue<Ctx>,
188
189    /// Status update mode
190    status_update_mode: StatusUpdateMode,
191}
192
193struct HandlerState<'a, Ctx: Context> {
194    /// Scheduler for timers, used to start new timers for outgoing requests
195    /// and correlate elapsed timers to the original request and peer.
196    timers: &'a mut Timers,
197    /// In-flight requests, used to correlate timeouts and responses to the original request and peer.
198    inflight: &'a mut InflightRequests<Ctx>,
199    /// Buffer for sync responses for heights ahead of consensus, keyed by height.
200    sync_queue: &'a mut SyncQueue<Ctx>,
201    /// The current consensus height according to the last processed input.
202    consensus_height: Ctx::Height,
203}
204
205#[allow(dead_code)]
206pub struct Sync<Ctx, Codec>
207where
208    Ctx: Context,
209    Codec: SyncCodec<Ctx>,
210{
211    ctx: Ctx,
212    network: NetworkRef<Ctx>,
213    host: HostRef<Ctx>,
214    consensus: ConsensusRef<Ctx>,
215    params: Params,
216    sync_codec: Codec,
217    sync_config: sync::Config,
218    metrics: sync::Metrics,
219    span: tracing::Span,
220}
221
222impl<Ctx, Codec> Sync<Ctx, Codec>
223where
224    Ctx: Context,
225    Codec: SyncCodec<Ctx>,
226{
227    #[allow(clippy::too_many_arguments)]
228    pub fn new(
229        ctx: Ctx,
230        network: NetworkRef<Ctx>,
231        host: HostRef<Ctx>,
232        consensus: ConsensusRef<Ctx>,
233        params: Params,
234        sync_codec: Codec,
235        sync_config: sync::Config,
236        metrics: sync::Metrics,
237        span: tracing::Span,
238    ) -> Self {
239        Self {
240            ctx,
241            network,
242            host,
243            consensus,
244            params,
245            sync_codec,
246            sync_config,
247            metrics,
248            span,
249        }
250    }
251
252    #[allow(clippy::too_many_arguments)]
253    pub async fn spawn(
254        ctx: Ctx,
255        network: NetworkRef<Ctx>,
256        host: HostRef<Ctx>,
257        consensus: ConsensusRef<Ctx>,
258        params: Params,
259        sync_codec: Codec,
260        sync_config: sync::Config,
261        metrics: sync::Metrics,
262        span: tracing::Span,
263    ) -> Result<SyncRef<Ctx>, ractor::SpawnErr> {
264        let actor = Self::new(
265            ctx,
266            network,
267            host,
268            consensus,
269            params,
270            sync_codec,
271            sync_config,
272            metrics,
273            span,
274        );
275        let (actor_ref, _) = Actor::spawn(None, actor, ()).await?;
276        Ok(actor_ref)
277    }
278
279    async fn process_input(
280        &self,
281        myself: &ActorRef<Msg<Ctx>>,
282        state: &mut State<Ctx>,
283        input: sync::Input<Ctx>,
284    ) -> Result<(), ActorProcessingErr> {
285        let mut handler_state = HandlerState {
286            timers: &mut state.timers,
287            inflight: &mut state.inflight,
288            sync_queue: &mut state.sync_queue,
289            consensus_height: state.sync.consensus_height,
290        };
291
292        malachitebft_sync::process!(
293            input: input,
294            state: &mut state.sync,
295            metrics: &self.metrics,
296            with: effect => {
297                self.handle_effect(
298                    myself,
299                    &mut handler_state,
300                    effect,
301                ).await
302            }
303        )
304    }
305
306    async fn get_history_min_height(&self) -> Result<Ctx::Height, ActorProcessingErr> {
307        ractor::call!(self.host, |reply_to| HostMsg::GetHistoryMinHeight {
308            reply_to
309        })
310        .map_err(|e| eyre!("Failed to get earliest history height: {e:?}").into())
311    }
312
313    async fn handle_effect(
314        &self,
315        myself: &ActorRef<Msg<Ctx>>,
316        state: &mut HandlerState<'_, Ctx>,
317        effect: sync::Effect<Ctx>,
318    ) -> Result<sync::Resume<Ctx>, ActorProcessingErr> {
319        use sync::Effect;
320
321        match effect {
322            Effect::BroadcastStatus(height, r) => {
323                let history_min_height = self.get_history_min_height().await?;
324
325                self.network.cast(NetworkMsg::BroadcastStatus(Status::new(
326                    height,
327                    history_min_height,
328                )))?;
329
330                Ok(r.resume_with(()))
331            }
332
333            Effect::SendValueRequest(peer_id, value_request, r) => {
334                let request = Request::ValueRequest(value_request);
335                let result = ractor::call!(self.network, |reply_to| {
336                    NetworkMsg::OutgoingRequest(peer_id, request.clone(), reply_to)
337                });
338
339                match result {
340                    Ok(request_id) => {
341                        let request_id = OutboundRequestId::new(request_id);
342
343                        state.timers.start_timer(
344                            Timeout::Request(request_id.clone()),
345                            self.params.request_timeout,
346                        );
347
348                        state.inflight.insert(
349                            request_id.clone(),
350                            InflightRequest {
351                                peer_id,
352                                request_id: request_id.clone(),
353                                request,
354                            },
355                        );
356
357                        info!(%peer_id, %request_id, "Sent value request to peer");
358
359                        Ok(r.resume_with(Some(request_id)))
360                    }
361                    Err(e) => {
362                        error!("Failed to send request to network layer: {e}");
363                        Ok(r.resume_with(None))
364                    }
365                }
366            }
367
368            Effect::SendValueResponse(request_id, value_response, r) => {
369                let response = Response::ValueResponse(value_response);
370                self.network
371                    .cast(NetworkMsg::OutgoingResponse(request_id, response))?;
372
373                Ok(r.resume_with(()))
374            }
375
376            Effect::GetDecidedValues(request_id, range, r) => {
377                self.host.call_and_forward(
378                    {
379                        let range = range.clone();
380                        |reply_to| HostMsg::GetDecidedValues { range, reply_to }
381                    },
382                    myself,
383                    |values| Msg::<Ctx>::GotDecidedValues(request_id, range, values),
384                    None,
385                )?;
386
387                Ok(r.resume_with(()))
388            }
389
390            Effect::ProcessValueResponse(peer_id, request_id, response, r) => {
391                self.process_value_response(state, peer_id, request_id, response);
392                Ok(r.resume_with(()))
393            }
394        }
395    }
396
397    fn process_value_response(
398        &self,
399        state: &mut HandlerState<'_, Ctx>,
400        peer_id: PeerId,
401        request_id: OutboundRequestId,
402        response: sync::ValueResponse<Ctx>,
403    ) {
404        let consensus_height = state.consensus_height;
405        let mut ignored = Vec::new();
406        let mut buffered = Vec::new();
407
408        for raw_value in response.values {
409            let height = raw_value.height();
410            let value = raw_value.to_core(peer_id);
411
412            match height.cmp(&consensus_height) {
413                // The value is for a height that has already been decided, ignore it.
414                Ordering::Less => {
415                    ignored.push(height);
416                }
417
418                // The value is for a height ahead of consensus, buffer it for later processing when we reach that height.
419                Ordering::Greater => {
420                    let buffered_value = BufferedValue::new(request_id.clone(), value);
421                    if state.sync_queue.push(height, buffered_value) {
422                        buffered.push(height);
423                    } else {
424                        warn!(%peer_id, %request_id, %height, "Failed to buffer sync response, queue is full");
425                    }
426                }
427
428                // The value is for the current consensus height, process it immediately.
429                Ordering::Equal => {
430                    debug!(%peer_id, %request_id, %height, "Processing value for current consensus height");
431
432                    if let Err(e) = self
433                        .consensus
434                        .cast(ConsensusMsg::ProcessSyncResponse(value))
435                    {
436                        error!("Failed to forward value response to consensus: {e}");
437                    }
438                }
439            }
440        }
441
442        self.metrics
443            .sync_queue_updated(state.sync_queue.len(), state.sync_queue.size());
444
445        if !ignored.is_empty() {
446            debug!(
447                %peer_id, %request_id, ?ignored,
448                "Ignored {} values for already decided heights", ignored.len()
449            );
450        }
451
452        if !buffered.is_empty() {
453            debug!(
454                %peer_id, %request_id, ?buffered,
455                "Buffered {} values for heights ahead of consensus", buffered.len()
456            );
457        }
458    }
459
460    async fn handle_msg(
461        &self,
462        myself: ActorRef<Msg<Ctx>>,
463        msg: Msg<Ctx>,
464        state: &mut State<Ctx>,
465    ) -> Result<(), ActorProcessingErr> {
466        match msg {
467            Msg::Tick => {
468                self.process_input(&myself, state, sync::Input::SendStatusUpdate)
469                    .await?;
470            }
471
472            Msg::NetworkEvent(NetworkEvent::PeerDisconnected(peer_id)) => {
473                info!(%peer_id, "Disconnected from peer");
474
475                if state.sync.peers.remove(&peer_id).is_some() {
476                    debug!(%peer_id, "Removed disconnected peer");
477                }
478            }
479
480            Msg::NetworkEvent(NetworkEvent::Status(peer_id, status)) => {
481                let status = sync::Status {
482                    peer_id,
483                    tip_height: status.tip_height,
484                    history_min_height: status.history_min_height,
485                };
486
487                self.process_input(&myself, state, sync::Input::Status(status))
488                    .await?;
489            }
490
491            Msg::NetworkEvent(NetworkEvent::SyncRequest(request_id, from, request)) => {
492                match request {
493                    Request::ValueRequest(value_request) => {
494                        self.process_input(
495                            &myself,
496                            state,
497                            sync::Input::ValueRequest(request_id, from, value_request),
498                        )
499                        .await?;
500                    }
501                };
502            }
503
504            Msg::NetworkEvent(NetworkEvent::SyncResponse(request_id, peer, response)) => {
505                // Cancel the timer associated with the request for which we just received a response
506                state.timers.cancel(&Timeout::Request(request_id.clone()));
507
508                // Remove the in-flight request
509                if state.inflight.remove(&request_id).is_none() {
510                    debug!(%request_id, %peer, "Received response for unknown request");
511
512                    // Ignore response for unknown request
513                    // This can happen if the request timed out and was removed from in-flight requests
514                    // in the meantime or if we receive a duplicate response.
515                    return Ok(());
516                }
517
518                let response = response.map(|resp| match resp {
519                    Response::ValueResponse(value_response) => value_response,
520                });
521
522                self.process_input(
523                    &myself,
524                    state,
525                    sync::Input::ValueResponse(request_id, peer, response),
526                )
527                .await?;
528            }
529
530            Msg::NetworkEvent(_) => {
531                // Ignore other gossip events
532            }
533
534            // (Re)Started a new height
535            Msg::StartedHeight(height, restart) => {
536                if restart.is_restart() {
537                    // Clear the sync queue
538                    state.sync_queue.clear();
539                    self.metrics.sync_queue_updated(0, 0);
540                }
541
542                self.process_input(&myself, state, sync::Input::StartedHeight(height, restart))
543                    .await?;
544
545                // If in OnStartedHeight mode, send a status update for the previous decision,
546                // now that we know for sure that the application has stored the decided value,
547                // and we have updated our tip height.
548                if let StatusUpdateMode::OnStartedHeight = &state.status_update_mode {
549                    self.process_input(&myself, state, sync::Input::SendStatusUpdate)
550                        .await?;
551                }
552
553                // Drain buffered sync responses for this height
554                for buffered in state.sync_queue.shift_and_take(&height) {
555                    if let Err(e) = self
556                        .consensus
557                        .cast(ConsensusMsg::ProcessSyncResponse(buffered.value))
558                    {
559                        error!("Failed to forward buffered sync response to consensus: {e}");
560                        break;
561                    }
562                }
563
564                // Update metrics
565                self.metrics
566                    .sync_queue_heights
567                    .set(state.sync_queue.len() as i64);
568                self.metrics
569                    .sync_queue_size
570                    .set(state.sync_queue.size() as i64);
571            }
572
573            // Decided on a value
574            Msg::Decided(height) => {
575                self.process_input(&myself, state, sync::Input::Decided(height))
576                    .await?;
577            }
578
579            // Received decided values from host
580            //
581            // We need to ensure that the total size of the response does not exceed the maximum allowed size.
582            // If it does, we truncate the response accordingly.
583            // This is to prevent sending overly large messages that could lead to network issues.
584            Msg::GotDecidedValues(request_id, range, mut values) => {
585                debug!(
586                    %request_id,
587                    range = %DisplayRange(&range),
588                    values_count = values.len(),
589                    "Processing decided values from host"
590                );
591
592                // Filter values to respect maximum response size
593                let max_response_size = ByteSize::b(self.sync_config.max_response_size as u64);
594                truncate_values_to_size_limit(&mut values, max_response_size, &self.sync_codec);
595
596                self.process_input(
597                    &myself,
598                    state,
599                    sync::Input::GotDecidedValues(request_id, range, values),
600                )
601                .await?;
602            }
603
604            Msg::InvalidValue(peer, height) => {
605                // Remove buffered values that came from the same request as the invalid value.
606                // This prevents stale values from a bad peer from being drained to consensus
607                // when the height advances.
608                if let Some((request_id, _)) = state.sync.get_request_id_by(height) {
609                    let removed = state.sync_queue.retain(|_, bv| bv.request_id != request_id);
610
611                    if removed > 0 {
612                        debug!(
613                            %peer, %height, %request_id, removed,
614                            "Removed buffered values from invalidated request"
615                        );
616                        self.metrics
617                            .sync_queue_updated(state.sync_queue.len(), state.sync_queue.size());
618                    }
619                }
620
621                self.process_input(&myself, state, sync::Input::InvalidValue(peer, height))
622                    .await?
623            }
624
625            Msg::ValueProcessingError(peer, height) => {
626                self.process_input(
627                    &myself,
628                    state,
629                    sync::Input::ValueProcessingError(peer, height),
630                )
631                .await?
632            }
633
634            Msg::TimeoutElapsed(elapsed) => {
635                let Some(timeout) = state.timers.intercept_timer_msg(elapsed) else {
636                    // Timer was cancelled or already processed, ignore
637                    return Ok(());
638                };
639
640                info!(?timeout, "Timeout elapsed");
641
642                match timeout {
643                    Timeout::Request(request_id) => {
644                        if let Some(inflight) = state.inflight.remove(&request_id) {
645                            self.process_input(
646                                &myself,
647                                state,
648                                sync::Input::SyncRequestTimedOut(
649                                    request_id,
650                                    inflight.peer_id,
651                                    inflight.request,
652                                ),
653                            )
654                            .await?;
655                        } else {
656                            debug!(%request_id, "Timeout for unknown request");
657                        }
658                    }
659                }
660            }
661        }
662
663        Ok(())
664    }
665}
666
667fn status_update_mode<Ctx, R>(
668    interval: Duration,
669    sync: &ActorRef<Msg<Ctx>>,
670    rng: &mut R,
671) -> StatusUpdateMode
672where
673    Ctx: Context,
674    R: rand::Rng,
675{
676    if interval == Duration::ZERO {
677        info!("Using status update mode: OnStartedHeight");
678        StatusUpdateMode::OnStartedHeight
679    } else {
680        info!("Using status update mode: Interval");
681
682        // One-time uniform adjustment factor [-1%, +1%]
683        const ADJ_RATE: f64 = 0.01;
684        let adjustment = rng.gen_range(-ADJ_RATE..=ADJ_RATE);
685
686        let ticker = tokio::spawn(
687            ticker(interval, sync.clone(), adjustment, || Msg::Tick).in_current_span(),
688        );
689
690        StatusUpdateMode::Interval(ticker)
691    }
692}
693
694fn truncate_values_to_size_limit<Ctx, Codec>(
695    values: &mut Vec<RawDecidedValue<Ctx>>,
696    max_response_size: ByteSize,
697    codec: &Codec,
698) where
699    Ctx: Context,
700    Codec: SyncCodec<Ctx>,
701{
702    let mut current_size = ByteSize::b(0);
703    let mut keep_count = 0;
704
705    for value in values.iter() {
706        let height = value.certificate.height;
707
708        let value_response =
709            Response::ValueResponse(sync::ValueResponse::new(height, vec![value.clone()]));
710
711        let value_size = match codec.encoded_len(&value_response) {
712            Ok(size) => ByteSize::b(size as u64),
713            Err(e) => {
714                error!("Failed to get response size for value, stopping at height {height}: {e}");
715                break;
716            }
717        };
718
719        if current_size + value_size > max_response_size {
720            warn!(
721                %max_response_size, %current_size, %value_size,
722                "Maximum size limit would be exceeded, stopping at height {height}"
723            );
724            break;
725        }
726
727        current_size += value_size;
728        keep_count += 1;
729    }
730
731    // Drop the remaining elements past the size limit
732    values.truncate(keep_count);
733}
734
735#[async_trait]
736impl<Ctx, Codec> Actor for Sync<Ctx, Codec>
737where
738    Ctx: Context,
739    Codec: SyncCodec<Ctx>,
740{
741    type Msg = Msg<Ctx>;
742    type State = State<Ctx>;
743    type Arguments = ();
744
745    async fn pre_start(
746        &self,
747        myself: ActorRef<Self::Msg>,
748        _args: Self::Arguments,
749    ) -> Result<Self::State, ActorProcessingErr> {
750        self.network
751            .cast(NetworkMsg::Subscribe(Box::new(myself.clone())))?;
752
753        let mut rng = Box::new(rand::rngs::StdRng::from_entropy());
754
755        let status_update_mode =
756            status_update_mode(self.params.status_update_interval, &myself, &mut rng);
757
758        // NOTE: The queue capacity is set to accommodate all individual values for the
759        // maximum number of parallel requests and batch size, with some additional buffer.
760        let queue_capacity = 2 * self.sync_config.parallel_requests * self.sync_config.batch_size;
761
762        Ok(State {
763            sync: sync::State::new(rng, self.sync_config),
764            timers: Timers::new(Box::new(myself.clone())),
765            inflight: HashMap::new(),
766            sync_queue: SyncQueue::new(queue_capacity),
767            status_update_mode,
768        })
769    }
770
771    #[tracing::instrument(
772        name = "sync",
773        parent = &self.span,
774        skip_all,
775        fields(
776            tip_height = %state.sync.tip_height,
777            sync_height = %state.sync.sync_height,
778        ),
779    )]
780    async fn handle(
781        &self,
782        myself: ActorRef<Self::Msg>,
783        msg: Self::Msg,
784        state: &mut Self::State,
785    ) -> Result<(), ActorProcessingErr> {
786        if let Err(e) = self.handle_msg(myself, msg, state).await {
787            error!("Error handling message: {e:?}");
788        }
789
790        Ok(())
791    }
792
793    async fn post_stop(
794        &self,
795        _myself: ActorRef<Self::Msg>,
796        state: &mut Self::State,
797    ) -> Result<(), ActorProcessingErr> {
798        if let StatusUpdateMode::Interval(ticker) = &state.status_update_mode {
799            ticker.abort();
800        }
801
802        Ok(())
803    }
804}