1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
use crate::{errors::BlockProcessResult, model::stores::ghostdag::GhostdagData};
use kaspa_consensus_core::{block::Block, blockstatus::BlockStatus};
use kaspa_hashes::Hash;
use parking_lot::{Condvar, Mutex};
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap, VecDeque,
},
sync::Arc,
};
use tokio::sync::oneshot;
pub type BlockResultSender = oneshot::Sender<BlockProcessResult<BlockStatus>>;
pub enum BlockProcessingMessage {
Exit,
Process(BlockTask, BlockResultSender),
}
pub struct BlockTask {
/// The block to process, possibly header-only
pub block: Block,
/// Possibly attached trusted ghostdag data - will be set only for
/// trusted blocks arriving as part of the pruning proof
pub trusted_ghostdag_data: Option<Arc<GhostdagData>>,
/// A flag indicating whether to trigger virtual UTXO processing
pub update_virtual: bool,
}
/// An internal struct used to manage a block processing task
struct BlockTaskInternal {
// The externally accepted block task
task: Option<BlockTask>,
// A list of channel senders for transmitting the processing result of this task to the async callers
result_transmitter: BlockResultSender,
}
impl BlockTaskInternal {
fn new(task: BlockTask, result_transmitter: BlockResultSender) -> Self {
Self { task: Some(task), result_transmitter }
}
}
pub(crate) type TaskId = Hash;
/// We usually only have a single task per hash. This enum optimizes for this.
enum TaskQueue {
Empty,
Single(BlockTaskInternal),
Many(VecDeque<BlockTaskInternal>),
}
impl TaskQueue {
fn new(task: BlockTaskInternal) -> Self {
TaskQueue::Single(task)
}
fn push_back(&mut self, task: BlockTaskInternal) {
match self {
TaskQueue::Empty => *self = Self::Single(task),
TaskQueue::Single(_) => {
let prev = std::mem::replace(self, Self::Many(VecDeque::with_capacity(2)));
let TaskQueue::Single(t) = prev else { panic!() };
let TaskQueue::Many(q) = self else { panic!() };
q.push_back(t);
q.push_back(task);
}
TaskQueue::Many(q) => q.push_back(task),
}
}
fn front(&self) -> Option<&BlockTaskInternal> {
match self {
TaskQueue::Empty => None,
TaskQueue::Single(t) => Some(t),
TaskQueue::Many(q) => q.front(),
}
}
fn front_mut(&mut self) -> Option<&mut BlockTaskInternal> {
match self {
TaskQueue::Empty => None,
TaskQueue::Single(t) => Some(t),
TaskQueue::Many(q) => q.front_mut(),
}
}
fn pop_front(&mut self) -> Option<BlockTaskInternal> {
match self {
TaskQueue::Empty => None,
TaskQueue::Single(_) => {
let prev = std::mem::replace(self, Self::Empty);
let TaskQueue::Single(t) = prev else { panic!() };
Some(t)
}
TaskQueue::Many(q) => q.pop_front(),
}
}
fn is_empty(&self) -> bool {
match self {
TaskQueue::Empty => true,
TaskQueue::Single(_) => false,
TaskQueue::Many(q) => q.is_empty(),
}
}
}
struct BlockTaskGroup {
// Queue of tasks within this group (where all belong to the same hash)
tasks: TaskQueue,
// A list of block hashes depending on the completion of this task group
dependent_tasks: Vec<TaskId>,
}
impl BlockTaskGroup {
fn new(task: BlockTaskInternal) -> Self {
Self { tasks: TaskQueue::new(task), dependent_tasks: Vec::new() }
}
}
/// A concurrent data structure for managing block processing tasks and their DAG dependencies
pub(crate) struct BlockTaskDependencyManager {
/// Holds pending block hashes and their corresponding tasks
pending: Mutex<HashMap<Hash, BlockTaskGroup>>,
// Used to signal that workers are idle
idle_signal: Condvar,
}
impl BlockTaskDependencyManager {
pub fn new() -> Self {
Self { pending: Mutex::new(HashMap::new()), idle_signal: Condvar::new() }
}
/// Registers the `(task, result_transmitters)` pair as a pending task. If a task with the same
/// hash is already pending and has a corresponding internal task group, the task group is updated
/// with the additional task and the function returns `None` indicating that the task shall
/// not be queued for processing yet. The function is expected to be called by a single worker
/// controlling the reception of block processing tasks.
pub fn register(&self, task: BlockTask, result_transmitter: BlockResultSender) -> Option<TaskId> {
let mut pending = self.pending.lock();
let hash = task.block.hash();
match pending.entry(hash) {
Vacant(e) => {
e.insert(BlockTaskGroup::new(BlockTaskInternal::new(task, result_transmitter)));
Some(hash)
}
e => {
e.and_modify(|g| {
g.tasks.push_back(BlockTaskInternal::new(task, result_transmitter));
});
None
}
}
}
/// To be called by worker threads wanting to begin a processing task which was
/// previously registered through `self.register`. If any of the direct parents `parent` of
/// this task id are in `pending` state, the task is queued as a dependency to the `parent` task
/// and wil be re-evaluated once that task completes -- in which case the function will return `None`.
pub fn try_begin(&self, task_id: TaskId) -> Option<BlockTask> {
// Lock the pending map. The contention around the lock is
// expected to be negligible in task processing time
let mut pending = self.pending.lock();
let group = pending.get(&task_id).expect("try_begin expects a task group");
let internal_task = group.tasks.front().expect("try_begin expects a task");
let header = internal_task.task.as_ref().expect("task is expected to not be taken").block.header.clone();
for parent in header.direct_parents().iter() {
if let Some(parent_task) = pending.get_mut(parent) {
parent_task.dependent_tasks.push(task_id);
return None; // The block will be reprocessed once the pending parent completes processing
}
}
// Re-access and take the inner task (now with mutable access)
Some(pending.get_mut(&task_id).unwrap().tasks.front_mut().unwrap().task.take().unwrap())
}
/// Report the completion of a processing task. Signals idleness if pending task list is emptied.
/// The function passes the `task` and the `result_transmitter` to the
/// provided `callback` function (note that `callback` is called under the internal lock),
/// and returns a list of `dependent_tasks` which should be requeued to workers.
pub fn end<F>(&self, task: BlockTask, callback: F) -> Vec<TaskId>
where
F: Fn(BlockTask, BlockResultSender),
{
let task_id = task.block.hash();
// Re-lock for post-processing steps
let mut pending = self.pending.lock();
let Occupied(mut entry) = pending.entry(task_id) else { panic!("processed task is expected to have an entry") };
let internal_task = entry.get_mut().tasks.pop_front().expect("same task from try_begin is expected");
// If this task group is not empty, we return the same hash in order for the next task in
// the group to be queued, otherwise we return the dependent tasks
let next_tasks = if entry.get().tasks.is_empty() { entry.remove().dependent_tasks } else { vec![task_id] };
// We expect the inner task to be taken by `try_begin`
assert!(internal_task.task.is_none());
// Callback within the lock
callback(task, internal_task.result_transmitter);
if pending.is_empty() {
self.idle_signal.notify_one();
}
next_tasks
}
/// Wait until all pending tasks are completed and workers are idle.
pub fn wait_for_idle(&self) {
let mut pending = self.pending.lock();
if !pending.is_empty() {
self.idle_signal.wait(&mut pending);
}
}
}