kaspa_consensus/pipeline/
deps_manager.rs1use crate::errors::BlockProcessResult;
2use kaspa_consensus_core::{block::Block, blockstatus::BlockStatus};
3use kaspa_hashes::Hash;
4use parking_lot::{Condvar, Mutex};
5use std::collections::{
6 hash_map::Entry::{Occupied, Vacant},
7 HashMap, VecDeque,
8};
9use tokio::sync::oneshot;
10
11pub type BlockResultSender = oneshot::Sender<BlockProcessResult<BlockStatus>>;
12
13pub enum BlockProcessingMessage {
14 Exit,
15 Process(BlockTask, BlockResultSender, BlockResultSender),
16}
17
18impl BlockProcessingMessage {
19 pub fn is_processing_message(&self) -> bool {
20 matches!(self, BlockProcessingMessage::Process(_, _, _))
21 }
22
23 pub fn is_exit_message(&self) -> bool {
24 matches!(self, BlockProcessingMessage::Exit)
25 }
26}
27
28pub enum VirtualStateProcessingMessage {
29 Exit,
30 Process(BlockTask, BlockResultSender),
31}
32
33impl VirtualStateProcessingMessage {
34 pub fn is_processing_message(&self) -> bool {
35 matches!(self, VirtualStateProcessingMessage::Process(_, _))
36 }
37
38 pub fn is_exit_message(&self) -> bool {
39 matches!(self, VirtualStateProcessingMessage::Exit)
40 }
41}
42
43pub enum BlockTask {
44 Ordinary { block: Block },
46
47 Trusted { block: Block },
50}
51
52impl BlockTask {
53 pub fn block(&self) -> &Block {
54 match self {
55 BlockTask::Ordinary { block } => block,
56 BlockTask::Trusted { block } => block,
57 }
58 }
59
60 pub fn is_ordinary(&self) -> bool {
61 matches!(self, BlockTask::Ordinary { .. })
62 }
63
64 pub fn is_trusted(&self) -> bool {
65 matches!(self, BlockTask::Trusted { .. })
66 }
67
68 pub fn requires_virtual_processing(&self) -> bool {
69 self.is_ordinary()
71 }
72}
73
74struct BlockTaskInternal {
76 task: Option<BlockTask>,
78
79 block_result_transmitter: BlockResultSender,
81 virtual_state_result_transmitter: BlockResultSender,
82}
83
84impl BlockTaskInternal {
85 fn new(task: BlockTask, block_result_transmitter: BlockResultSender, virtual_state_result_transmitter: BlockResultSender) -> Self {
86 Self { task: Some(task), block_result_transmitter, virtual_state_result_transmitter }
87 }
88}
89
90pub(crate) type TaskId = Hash;
91
92enum TaskQueue {
94 Empty,
95 Single(BlockTaskInternal),
96 Many(VecDeque<BlockTaskInternal>),
97}
98
99impl TaskQueue {
100 fn new(task: BlockTaskInternal) -> Self {
101 TaskQueue::Single(task)
102 }
103
104 fn push_back(&mut self, task: BlockTaskInternal) {
105 match self {
106 TaskQueue::Empty => *self = Self::Single(task),
107 TaskQueue::Single(_) => {
108 let prev = std::mem::replace(self, Self::Many(VecDeque::with_capacity(2)));
109 let TaskQueue::Single(t) = prev else { panic!() };
110 let TaskQueue::Many(q) = self else { panic!() };
111 q.push_back(t);
112 q.push_back(task);
113 }
114 TaskQueue::Many(q) => q.push_back(task),
115 }
116 }
117
118 fn front(&self) -> Option<&BlockTaskInternal> {
119 match self {
120 TaskQueue::Empty => None,
121 TaskQueue::Single(t) => Some(t),
122 TaskQueue::Many(q) => q.front(),
123 }
124 }
125
126 fn front_mut(&mut self) -> Option<&mut BlockTaskInternal> {
127 match self {
128 TaskQueue::Empty => None,
129 TaskQueue::Single(t) => Some(t),
130 TaskQueue::Many(q) => q.front_mut(),
131 }
132 }
133
134 fn pop_front(&mut self) -> Option<BlockTaskInternal> {
135 match self {
136 TaskQueue::Empty => None,
137 TaskQueue::Single(_) => {
138 let prev = std::mem::replace(self, Self::Empty);
139 let TaskQueue::Single(t) = prev else { panic!() };
140 Some(t)
141 }
142 TaskQueue::Many(q) => q.pop_front(),
143 }
144 }
145
146 fn is_empty(&self) -> bool {
147 match self {
148 TaskQueue::Empty => true,
149 TaskQueue::Single(_) => false,
150 TaskQueue::Many(q) => q.is_empty(),
151 }
152 }
153}
154
155struct BlockTaskGroup {
156 tasks: TaskQueue,
158
159 dependent_tasks: Vec<TaskId>,
161}
162
163impl BlockTaskGroup {
164 fn new(task: BlockTaskInternal) -> Self {
165 Self { tasks: TaskQueue::new(task), dependent_tasks: Vec::new() }
166 }
167}
168
169pub(crate) struct BlockTaskDependencyManager {
171 pending: Mutex<HashMap<Hash, BlockTaskGroup>>,
173
174 idle_signal: Condvar,
176}
177
178impl BlockTaskDependencyManager {
179 pub fn new() -> Self {
180 Self { pending: Mutex::new(HashMap::new()), idle_signal: Condvar::new() }
181 }
182
183 pub fn register(
189 &self,
190 task: BlockTask,
191 block_result_transmitter: BlockResultSender,
192 virtual_state_result_transmitter: BlockResultSender,
193 ) -> Option<TaskId> {
194 let mut pending = self.pending.lock();
195 let hash = task.block().hash();
196 match pending.entry(hash) {
197 Vacant(e) => {
198 e.insert(BlockTaskGroup::new(BlockTaskInternal::new(
199 task,
200 block_result_transmitter,
201 virtual_state_result_transmitter,
202 )));
203 Some(hash)
204 }
205 e => {
206 e.and_modify(|g| {
207 g.tasks.push_back(BlockTaskInternal::new(task, block_result_transmitter, virtual_state_result_transmitter));
208 });
209 None
210 }
211 }
212 }
213
214 pub fn try_begin(&self, task_id: TaskId) -> Option<BlockTask> {
219 let mut pending = self.pending.lock();
222 let group = pending.get(&task_id).expect("try_begin expects a task group");
223 let internal_task = group.tasks.front().expect("try_begin expects a task");
224 let header = internal_task.task.as_ref().expect("task is expected to not be taken").block().header.clone();
225 for parent in header.direct_parents().iter() {
226 if let Some(parent_task) = pending.get_mut(parent) {
227 parent_task.dependent_tasks.push(task_id);
228 return None; }
230 }
231 Some(pending.get_mut(&task_id).unwrap().tasks.front_mut().unwrap().task.take().unwrap())
233 }
234
235 pub fn end<F>(&self, task: BlockTask, callback: F) -> Vec<TaskId>
240 where
241 F: Fn(BlockTask, BlockResultSender, BlockResultSender),
242 {
243 let task_id = task.block().hash();
244 let mut pending = self.pending.lock();
246
247 let Occupied(mut entry) = pending.entry(task_id) else { panic!("processed task is expected to have an entry") };
248 let internal_task = entry.get_mut().tasks.pop_front().expect("same task from try_begin is expected");
249
250 let next_tasks = if entry.get().tasks.is_empty() { entry.remove().dependent_tasks } else { vec![task_id] };
253
254 assert!(internal_task.task.is_none());
256
257 callback(task, internal_task.block_result_transmitter, internal_task.virtual_state_result_transmitter);
259
260 if pending.is_empty() {
261 self.idle_signal.notify_one();
262 }
263
264 next_tasks
265 }
266
267 pub fn wait_for_idle(&self) {
269 let mut pending = self.pending.lock();
270 if !pending.is_empty() {
271 self.idle_signal.wait(&mut pending);
272 }
273 }
274}