kaspa_consensus/pipeline/
deps_manager.rs

1use crate::errors::BlockProcessResult;
2use kaspa_consensus_core::{block::Block, blockstatus::BlockStatus};
3use kaspa_hashes::Hash;
4use parking_lot::{Condvar, Mutex};
5use std::collections::{
6    hash_map::Entry::{Occupied, Vacant},
7    HashMap, VecDeque,
8};
9use tokio::sync::oneshot;
10
11pub type BlockResultSender = oneshot::Sender<BlockProcessResult<BlockStatus>>;
12
13pub enum BlockProcessingMessage {
14    Exit,
15    Process(BlockTask, BlockResultSender, BlockResultSender),
16}
17
18impl BlockProcessingMessage {
19    pub fn is_processing_message(&self) -> bool {
20        matches!(self, BlockProcessingMessage::Process(_, _, _))
21    }
22
23    pub fn is_exit_message(&self) -> bool {
24        matches!(self, BlockProcessingMessage::Exit)
25    }
26}
27
28pub enum VirtualStateProcessingMessage {
29    Exit,
30    Process(BlockTask, BlockResultSender),
31}
32
33impl VirtualStateProcessingMessage {
34    pub fn is_processing_message(&self) -> bool {
35        matches!(self, VirtualStateProcessingMessage::Process(_, _))
36    }
37
38    pub fn is_exit_message(&self) -> bool {
39        matches!(self, VirtualStateProcessingMessage::Exit)
40    }
41}
42
43pub enum BlockTask {
44    /// Ordinary block processing task, requiring full validation. The block might be header-only
45    Ordinary { block: Block },
46
47    /// Trusted block processing task, only requiring partial validation.
48    /// Trusted blocks arrive as part of the pruning proof; the block might be header-only.
49    Trusted { block: Block },
50}
51
52impl BlockTask {
53    pub fn block(&self) -> &Block {
54        match self {
55            BlockTask::Ordinary { block } => block,
56            BlockTask::Trusted { block } => block,
57        }
58    }
59
60    pub fn is_ordinary(&self) -> bool {
61        matches!(self, BlockTask::Ordinary { .. })
62    }
63
64    pub fn is_trusted(&self) -> bool {
65        matches!(self, BlockTask::Trusted { .. })
66    }
67
68    pub fn requires_virtual_processing(&self) -> bool {
69        // Trusted blocks should not trigger virtual processing
70        self.is_ordinary()
71    }
72}
73
74/// An internal struct used to manage a block processing task
75struct BlockTaskInternal {
76    // The externally accepted block task
77    task: Option<BlockTask>,
78
79    // A list of channel senders for transmitting the processing result of this task to the async callers
80    block_result_transmitter: BlockResultSender,
81    virtual_state_result_transmitter: BlockResultSender,
82}
83
84impl BlockTaskInternal {
85    fn new(task: BlockTask, block_result_transmitter: BlockResultSender, virtual_state_result_transmitter: BlockResultSender) -> Self {
86        Self { task: Some(task), block_result_transmitter, virtual_state_result_transmitter }
87    }
88}
89
90pub(crate) type TaskId = Hash;
91
92/// We usually only have a single task per hash. This enum optimizes for this.
93enum TaskQueue {
94    Empty,
95    Single(BlockTaskInternal),
96    Many(VecDeque<BlockTaskInternal>),
97}
98
99impl TaskQueue {
100    fn new(task: BlockTaskInternal) -> Self {
101        TaskQueue::Single(task)
102    }
103
104    fn push_back(&mut self, task: BlockTaskInternal) {
105        match self {
106            TaskQueue::Empty => *self = Self::Single(task),
107            TaskQueue::Single(_) => {
108                let prev = std::mem::replace(self, Self::Many(VecDeque::with_capacity(2)));
109                let TaskQueue::Single(t) = prev else { panic!() };
110                let TaskQueue::Many(q) = self else { panic!() };
111                q.push_back(t);
112                q.push_back(task);
113            }
114            TaskQueue::Many(q) => q.push_back(task),
115        }
116    }
117
118    fn front(&self) -> Option<&BlockTaskInternal> {
119        match self {
120            TaskQueue::Empty => None,
121            TaskQueue::Single(t) => Some(t),
122            TaskQueue::Many(q) => q.front(),
123        }
124    }
125
126    fn front_mut(&mut self) -> Option<&mut BlockTaskInternal> {
127        match self {
128            TaskQueue::Empty => None,
129            TaskQueue::Single(t) => Some(t),
130            TaskQueue::Many(q) => q.front_mut(),
131        }
132    }
133
134    fn pop_front(&mut self) -> Option<BlockTaskInternal> {
135        match self {
136            TaskQueue::Empty => None,
137            TaskQueue::Single(_) => {
138                let prev = std::mem::replace(self, Self::Empty);
139                let TaskQueue::Single(t) = prev else { panic!() };
140                Some(t)
141            }
142            TaskQueue::Many(q) => q.pop_front(),
143        }
144    }
145
146    fn is_empty(&self) -> bool {
147        match self {
148            TaskQueue::Empty => true,
149            TaskQueue::Single(_) => false,
150            TaskQueue::Many(q) => q.is_empty(),
151        }
152    }
153}
154
155struct BlockTaskGroup {
156    // Queue of tasks within this group (where all belong to the same hash)
157    tasks: TaskQueue,
158
159    // A list of block hashes depending on the completion of this task group
160    dependent_tasks: Vec<TaskId>,
161}
162
163impl BlockTaskGroup {
164    fn new(task: BlockTaskInternal) -> Self {
165        Self { tasks: TaskQueue::new(task), dependent_tasks: Vec::new() }
166    }
167}
168
169/// A concurrent data structure for managing block processing tasks and their DAG dependencies
170pub(crate) struct BlockTaskDependencyManager {
171    /// Holds pending block hashes and their corresponding tasks
172    pending: Mutex<HashMap<Hash, BlockTaskGroup>>,
173
174    // Used to signal that workers are idle
175    idle_signal: Condvar,
176}
177
178impl BlockTaskDependencyManager {
179    pub fn new() -> Self {
180        Self { pending: Mutex::new(HashMap::new()), idle_signal: Condvar::new() }
181    }
182
183    /// Registers the `(task, result_transmitter)` pair as a pending task. If a task with the same
184    /// hash is already pending and has a corresponding internal task group, the task group is updated
185    /// with the additional task and the function returns `None` indicating that the task shall
186    /// not be queued for processing yet. The function is expected to be called by a single worker
187    /// controlling the reception of block processing tasks.
188    pub fn register(
189        &self,
190        task: BlockTask,
191        block_result_transmitter: BlockResultSender,
192        virtual_state_result_transmitter: BlockResultSender,
193    ) -> Option<TaskId> {
194        let mut pending = self.pending.lock();
195        let hash = task.block().hash();
196        match pending.entry(hash) {
197            Vacant(e) => {
198                e.insert(BlockTaskGroup::new(BlockTaskInternal::new(
199                    task,
200                    block_result_transmitter,
201                    virtual_state_result_transmitter,
202                )));
203                Some(hash)
204            }
205            e => {
206                e.and_modify(|g| {
207                    g.tasks.push_back(BlockTaskInternal::new(task, block_result_transmitter, virtual_state_result_transmitter));
208                });
209                None
210            }
211        }
212    }
213
214    /// To be called by worker threads wanting to begin a processing task which was
215    /// previously registered through `self.register`. If any of the direct parents `parent` of
216    /// this task id are in `pending` state, the task is queued as a dependency to the `parent` task
217    /// and wil be re-evaluated once that task completes -- in which case the function will return `None`.
218    pub fn try_begin(&self, task_id: TaskId) -> Option<BlockTask> {
219        // Lock the pending map. The contention around the lock is
220        // expected to be negligible in task processing time
221        let mut pending = self.pending.lock();
222        let group = pending.get(&task_id).expect("try_begin expects a task group");
223        let internal_task = group.tasks.front().expect("try_begin expects a task");
224        let header = internal_task.task.as_ref().expect("task is expected to not be taken").block().header.clone();
225        for parent in header.direct_parents().iter() {
226            if let Some(parent_task) = pending.get_mut(parent) {
227                parent_task.dependent_tasks.push(task_id);
228                return None; // The block will be reprocessed once the pending parent completes processing
229            }
230        }
231        // Re-access and take the inner task (now with mutable access)
232        Some(pending.get_mut(&task_id).unwrap().tasks.front_mut().unwrap().task.take().unwrap())
233    }
234
235    /// Report the completion of a processing task. Signals idleness if pending task list is emptied.
236    /// The function passes the `task` and the `result_transmitter` to the
237    /// provided `callback` function (note that `callback` is called under the internal lock),
238    /// and returns a list of `dependent_tasks` which should be requeued to workers.
239    pub fn end<F>(&self, task: BlockTask, callback: F) -> Vec<TaskId>
240    where
241        F: Fn(BlockTask, BlockResultSender, BlockResultSender),
242    {
243        let task_id = task.block().hash();
244        // Re-lock for post-processing steps
245        let mut pending = self.pending.lock();
246
247        let Occupied(mut entry) = pending.entry(task_id) else { panic!("processed task is expected to have an entry") };
248        let internal_task = entry.get_mut().tasks.pop_front().expect("same task from try_begin is expected");
249
250        // If this task group is not empty, we return the same hash in order for the next task in
251        // the group to be queued, otherwise we return the dependent tasks
252        let next_tasks = if entry.get().tasks.is_empty() { entry.remove().dependent_tasks } else { vec![task_id] };
253
254        // We expect the inner task to be taken by `try_begin`
255        assert!(internal_task.task.is_none());
256
257        // Callback within the lock
258        callback(task, internal_task.block_result_transmitter, internal_task.virtual_state_result_transmitter);
259
260        if pending.is_empty() {
261            self.idle_signal.notify_one();
262        }
263
264        next_tasks
265    }
266
267    /// Wait until all pending tasks are completed and workers are idle.
268    pub fn wait_for_idle(&self) {
269        let mut pending = self.pending.lock();
270        if !pending.is_empty() {
271            self.idle_signal.wait(&mut pending);
272        }
273    }
274}