tycho_consensus/engine/lifecycle/
session.rs1use std::sync::Arc;
2
3use parking_lot::Mutex;
4use tokio::sync::oneshot;
5use tokio_util::task::AbortOnDropHandle;
6use tycho_types::models::GenesisInfo;
7
8use crate::effects::{Cancelled, TaskTracker};
9use crate::engine::lifecycle::recover::{EngineRecoverLoop, RunAttributes};
10use crate::engine::lifecycle::session::isolated::SpanFields;
11use crate::engine::lifecycle::{
12 EngineBinding, EngineError, EngineNetwork, EngineNetworkArgs, FixHistoryFlag,
13};
14use crate::engine::{Engine, MempoolMergedConfig};
15use crate::intercom::InitPeers;
16
17pub struct EngineSession {
18 genesis_info: GenesisInfo,
19 span_fields: SpanFields,
20 recover_loop: AbortOnDropHandle<()>,
21 run_attrs: Arc<Mutex<RunAttributes>>,
22 stop_tx: oneshot::Sender<()>,
23}
24
25impl EngineSession {
26 pub fn new(
27 bind: EngineBinding,
28 net_args: &EngineNetworkArgs,
29 merged_conf: &MempoolMergedConfig,
30 init_peers: InitPeers,
31 engine_stop_tx: oneshot::Sender<()>,
32 ) -> Self {
33 let span_fields = SpanFields::new(net_args, merged_conf);
34
35 let task_tracker = TaskTracker::default();
36 let net = EngineNetwork::new(net_args, &task_tracker, merged_conf, &init_peers);
37 let engine = Engine::new(
38 &task_tracker,
39 &bind,
40 &net,
41 merged_conf,
42 FixHistoryFlag::default(),
43 );
44 let peer_schedule = net.peer_schedule.downgrade();
45 let run_attrs = Arc::new(Mutex::new(RunAttributes {
46 tracker: task_tracker.clone(),
47 is_stopping: false,
48 peer_schedule: peer_schedule.clone(),
49 #[cfg(feature = "mock-feedback")]
50 mock_feedback: {
51 use crate::mock_feedback::MockFeedbackSender;
52 let sender = MockFeedbackSender::new(
53 net.dispatcher.clone(),
54 peer_schedule,
55 bind.top_known_anchor.clone(),
56 &init_peers,
57 net_args.network.peer_id(),
58 );
59 task_tracker.ctx().spawn(sender.run())
60 },
61 last_peers: init_peers,
62 }));
63
64 let recover_loop = AbortOnDropHandle::new(tokio::spawn(
65 EngineRecoverLoop {
66 bind,
67 net_args: net_args.clone(),
68 merged_conf: merged_conf.clone(),
69 run_attrs: run_attrs.clone(),
70 }
71 .run_loop(task_tracker.ctx().spawn(async move {
72 match engine.run().await {
73 Err(EngineError::Cancelled) => Err(Cancelled()),
74 Err(EngineError::HistoryConflict(e)) => Ok(Err(e)),
75 }
76 })),
77 ));
78
79 Self {
80 genesis_info: merged_conf.genesis_info,
81 span_fields,
82 stop_tx: engine_stop_tx,
83 run_attrs,
84 recover_loop,
85 }
86 }
87
88 pub fn genesis_info(&self) -> GenesisInfo {
89 self.genesis_info
90 }
91
92 pub fn set_peers(&self, peers: InitPeers) {
93 let mut run_attrs = self.run_attrs.lock();
94 if let Some(peer_schedule) = run_attrs.peer_schedule.upgrade() {
95 peer_schedule.set_peers(&peers);
96 }
97 run_attrs.last_peers = peers;
98 }
99
100 pub async fn stop(self) {
101 let span = self.span_fields.stop_span();
102
103 span.in_scope(|| tracing::warn!("waiting engine threads to exit"));
104
105 let engine_tracker = {
106 let mut guard = self.run_attrs.lock();
107 guard.is_stopping = true;
108 guard.tracker.clone()
109 };
110 drop(self.run_attrs); engine_tracker.stop().await;
112 self.recover_loop.await.ok();
113
114 span.in_scope(|| tracing::warn!("stop completed"));
115
116 self.stop_tx.send(()).ok();
117 }
118}
119
120mod isolated {
121 use tracing::Span;
122 use tycho_network::{OverlayId, PeerId};
123
124 use crate::effects::AltFormat;
125 use crate::engine::MempoolMergedConfig;
126 use crate::engine::lifecycle::EngineNetworkArgs;
127 use crate::models::Round;
128
129 pub struct SpanFields {
130 peer_id: PeerId,
131 overlay_id: OverlayId,
132 genesis_round: Round,
133 }
134
135 impl SpanFields {
136 pub fn new(net_args: &EngineNetworkArgs, merged_conf: &MempoolMergedConfig) -> Self {
137 Self {
138 peer_id: *net_args.network.peer_id(),
139 overlay_id: merged_conf.overlay_id,
140 genesis_round: merged_conf.conf.genesis_round,
141 }
142 }
143
144 pub fn stop_span(&self) -> Span {
145 tracing::error_span!(
146 "mempool stop in progress",
147 peer = %self.peer_id.alt(),
148 genesis_round = self.genesis_round.0,
149 overlay = %self.overlay_id,
150 )
151 }
152 }
153}