Skip to main content

tycho_consensus/engine/lifecycle/
session.rs

1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use tokio::sync::oneshot;
5use tokio_util::task::AbortOnDropHandle;
6use tycho_types::models::GenesisInfo;
7
8use crate::effects::{Cancelled, TaskTracker};
9use crate::engine::lifecycle::recover::{EngineRecoverLoop, RunAttributes};
10use crate::engine::lifecycle::session::isolated::SpanFields;
11use crate::engine::lifecycle::{
12    EngineBinding, EngineError, EngineNetwork, EngineNetworkArgs, FixHistoryFlag,
13};
14use crate::engine::{Engine, MempoolMergedConfig};
15use crate::intercom::InitPeers;
16
17pub struct EngineSession {
18    genesis_info: GenesisInfo,
19    span_fields: SpanFields,
20    recover_loop: AbortOnDropHandle<()>,
21    run_attrs: Arc<Mutex<RunAttributes>>,
22    stop_tx: oneshot::Sender<()>,
23}
24
25impl EngineSession {
26    pub fn new(
27        bind: EngineBinding,
28        net_args: &EngineNetworkArgs,
29        merged_conf: &MempoolMergedConfig,
30        init_peers: InitPeers,
31        engine_stop_tx: oneshot::Sender<()>,
32    ) -> Self {
33        let span_fields = SpanFields::new(net_args, merged_conf);
34
35        let task_tracker = TaskTracker::default();
36        let net = EngineNetwork::new(net_args, &task_tracker, merged_conf, &init_peers);
37        let engine = Engine::new(
38            &task_tracker,
39            &bind,
40            &net,
41            merged_conf,
42            FixHistoryFlag::default(),
43        );
44        let peer_schedule = net.peer_schedule.downgrade();
45        let run_attrs = Arc::new(Mutex::new(RunAttributes {
46            tracker: task_tracker.clone(),
47            is_stopping: false,
48            peer_schedule: peer_schedule.clone(),
49            #[cfg(feature = "mock-feedback")]
50            mock_feedback: {
51                use crate::mock_feedback::MockFeedbackSender;
52                let sender = MockFeedbackSender::new(
53                    net.dispatcher.clone(),
54                    peer_schedule,
55                    bind.top_known_anchor.clone(),
56                    &init_peers,
57                    net_args.network.peer_id(),
58                );
59                task_tracker.ctx().spawn(sender.run())
60            },
61            last_peers: init_peers,
62        }));
63
64        let recover_loop = AbortOnDropHandle::new(tokio::spawn(
65            EngineRecoverLoop {
66                bind,
67                net_args: net_args.clone(),
68                merged_conf: merged_conf.clone(),
69                run_attrs: run_attrs.clone(),
70            }
71            .run_loop(task_tracker.ctx().spawn(async move {
72                match engine.run().await {
73                    Err(EngineError::Cancelled) => Err(Cancelled()),
74                    Err(EngineError::HistoryConflict(e)) => Ok(Err(e)),
75                }
76            })),
77        ));
78
79        Self {
80            genesis_info: merged_conf.genesis_info,
81            span_fields,
82            stop_tx: engine_stop_tx,
83            run_attrs,
84            recover_loop,
85        }
86    }
87
88    pub fn genesis_info(&self) -> GenesisInfo {
89        self.genesis_info
90    }
91
92    pub fn set_peers(&self, peers: InitPeers) {
93        let mut run_attrs = self.run_attrs.lock();
94        if let Some(peer_schedule) = run_attrs.peer_schedule.upgrade() {
95            peer_schedule.set_peers(&peers);
96        }
97        run_attrs.last_peers = peers;
98    }
99
100    pub async fn stop(self) {
101        let span = self.span_fields.stop_span();
102
103        span.in_scope(|| tracing::warn!("waiting engine threads to exit"));
104
105        let engine_tracker = {
106            let mut guard = self.run_attrs.lock();
107            guard.is_stopping = true;
108            guard.tracker.clone()
109        };
110        drop(self.run_attrs); // drops `PeerSchedule` clone inside
111        engine_tracker.stop().await;
112        self.recover_loop.await.ok();
113
114        span.in_scope(|| tracing::warn!("stop completed"));
115
116        self.stop_tx.send(()).ok();
117    }
118}
119
120mod isolated {
121    use tracing::Span;
122    use tycho_network::{OverlayId, PeerId};
123
124    use crate::effects::AltFormat;
125    use crate::engine::MempoolMergedConfig;
126    use crate::engine::lifecycle::EngineNetworkArgs;
127    use crate::models::Round;
128
129    pub struct SpanFields {
130        peer_id: PeerId,
131        overlay_id: OverlayId,
132        genesis_round: Round,
133    }
134
135    impl SpanFields {
136        pub fn new(net_args: &EngineNetworkArgs, merged_conf: &MempoolMergedConfig) -> Self {
137            Self {
138                peer_id: *net_args.network.peer_id(),
139                overlay_id: merged_conf.overlay_id,
140                genesis_round: merged_conf.conf.genesis_round,
141            }
142        }
143
144        pub fn stop_span(&self) -> Span {
145            tracing::error_span!(
146                "mempool stop in progress",
147                peer = %self.peer_id.alt(),
148                genesis_round = self.genesis_round.0,
149                overlay = %self.overlay_id,
150            )
151        }
152    }
153}