1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
use std::sync::Arc;
use crate::{is_aggregated, is_non_aggregated, sv1::sv1_server::sv1_server::PendingTargetUpdate};
use stratum_apps::{
stratum_core::{
bitcoin::Target,
channels_sv2::{target::hash_rate_to_target, Vardiff},
mining_sv2::{SetTarget, UpdateChannel},
parsers_sv2::Mining,
stratum_translation::sv2_to_sv1::build_sv1_set_difficulty_from_sv2_target,
},
utils::types::{ChannelId, DownstreamId, Hashrate},
};
use tracing::{debug, error, info, trace, warn};
use crate::sv1::Sv1Server;
enum AggregatedSnapshot {
Active {
total_hashrate: Hashrate,
min_target: Target,
},
NoDownstreams,
}
#[cfg_attr(not(test), hotpath::measure_all)]
impl Sv1Server {
/// Spawns the variable difficulty adjustment loop.
///
/// This method implements the SV1 server's variable difficulty logic for all downstreams.
/// Every 60 seconds, this method updates the difficulty state for each downstream.
pub async fn spawn_vardiff_loop(self: Arc<Self>) {
info!("Variable difficulty adjustment enabled - starting vardiff loop");
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
ticker.tick().await;
info!("Starting vardiff loop for downstreams");
self.handle_vardiff_updates().await;
}
}
/// Handles variable difficulty adjustments for all connected downstreams.
///
/// This method implements the core vardiff logic:
/// 1. For each downstream, calculate if a target update is needed
/// 2. Always send UpdateChannel to keep upstream informed
/// 3. Compare new target with upstream target to decide when to send set_difficulty:
/// - If new_target >= upstream_target: send set_difficulty immediately
/// - If new_target < upstream_target: wait for SetTarget response before sending
/// set_difficulty
/// 4. Handle aggregated vs non-aggregated modes for UpdateChannel messages
async fn handle_vardiff_updates(&self) {
let mut immediate_updates = Vec::new();
let mut all_updates = Vec::new(); // All updates will generate UpdateChannel messages
for vardiff_key_pair in self.vardiff.iter() {
let downstream_id = vardiff_key_pair.key();
let vardiff = vardiff_key_pair.value();
debug!("Updating vardiff for downstream_id: {}", downstream_id);
let Some(downstream) = self.downstreams.get(downstream_id) else {
continue;
};
let (channel_id, hashrate, target, upstream_target) =
downstream.downstream_data.super_safe_lock(|data| {
// It's safe to unwrap hashrate because we know that
// the downstream has a hashrate (we are
// doing vardiff)
(
data.channel_id,
data.hashrate.unwrap(),
data.target,
data.upstream_target,
)
});
let Some(channel_id) = channel_id else {
error!("Channel id is none for downstream_id: {}", downstream_id);
continue;
};
let new_hashrate_opt = vardiff.super_safe_lock(|state| {
state.try_vardiff(hashrate, &target, self.shares_per_minute)
});
if let Ok(Some(new_hashrate)) = new_hashrate_opt {
// Calculate new target based on new hashrate
let new_target: Target =
match hash_rate_to_target(new_hashrate as f64, self.shares_per_minute as f64) {
Ok(target) => target,
Err(e) => {
error!(
"Failed to calculate target for hashrate {}: {:?}",
new_hashrate, e
);
continue;
}
};
// Always update the downstream's pending target and hashrate
if let Some(d) = self.downstreams.get(downstream_id) {
_ = d.downstream_data.safe_lock(|data| {
data.set_pending_target(new_target, d.downstream_id);
data.set_pending_hashrate(Some(new_hashrate), d.downstream_id);
});
}
// All updates will be sent as UpdateChannel messages
all_updates.push((*downstream_id, channel_id, new_target, new_hashrate));
// Determine if we should send set_difficulty immediately or wait
match upstream_target {
Some(upstream_target) => {
if new_target >= upstream_target {
// Case 1: new_target >= upstream_target, send set_difficulty
// immediately
trace!(
"✅ Target comparison: new_target ({}) >= upstream_target ({}) for downstream {}, will send set_difficulty immediately",
new_target, upstream_target, downstream_id
);
immediate_updates.push((channel_id, Some(*downstream_id), new_target));
} else {
// Case 2: new_target < upstream_target, delay set_difficulty until
// SetTarget
trace!(
"⏳ Target comparison: new_target ({}) < upstream_target ({}) for downstream {}, will delay set_difficulty until SetTarget",
new_target, upstream_target, downstream_id
);
self.pending_target_updates.super_safe_lock(|data| {
data.push(PendingTargetUpdate {
downstream_id: *downstream_id,
new_target,
new_hashrate,
})
});
}
}
None => {
// No upstream target set yet, send set_difficulty immediately as fallback
trace!(
"No upstream target set for downstream {}, will send set_difficulty immediately",
downstream_id
);
immediate_updates.push((channel_id, Some(*downstream_id), new_target));
}
}
}
}
// Send UpdateChannel messages for ALL updates (both immediate and delayed)
if !all_updates.is_empty() {
self.send_update_channel_messages(all_updates).await;
}
// Process immediate set_difficulty updates (for new_target >= upstream_target)
for (_channel_id, downstream_id, target) in immediate_updates {
// Send set_difficulty message immediately
if let Ok(set_difficulty_msg) = build_sv1_set_difficulty_from_sv2_target(target) {
let downstream_id = downstream_id.unwrap_or(0);
if let Some(sender) = self
.sv1_server_channel_state
.sv1_server_to_downstream_sender
.super_safe_lock(|downstream| downstream.get(&downstream_id).cloned())
{
if let Err(e) = sender.send(set_difficulty_msg).await {
error!(
"Failed to send immediate SetDifficulty message to downstream {}: {:?}",
downstream_id, e
);
} else {
trace!(
"Sent immediate SetDifficulty to downstream {} (new_target >= upstream_target)",
downstream_id
);
}
}
}
}
}
/// Sends UpdateChannel messages for all target updates.
///
/// Always sends UpdateChannel to keep upstream informed about target changes.
/// Handles both aggregated and non-aggregated modes:
/// - Aggregated: Send single UpdateChannel with minimum target and sum of hashrates
/// - Non-aggregated: Send individual UpdateChannel for each downstream
async fn send_update_channel_messages(
&self,
all_updates: Vec<(DownstreamId, ChannelId, Target, Hashrate)>, /* (downstream_id,
* channel_id,
* new_target,
* new_hashrate) */
) {
if is_aggregated() {
// Aggregated mode: Send single UpdateChannel with minimum target and total hashrate of
// ALL downstreams
self.send_aggregated_update_channel(all_updates).await;
} else {
// Non-aggregated mode: Send individual UpdateChannel for each downstream
self.send_non_aggregated_update_channels(all_updates).await;
}
}
async fn send_aggregated_update_channel(
&self,
all_updates: Vec<(DownstreamId, ChannelId, Target, Hashrate)>,
) {
// Nothing to do if we received no updates
let Some((_, channel_id, _, _)) = all_updates.first() else {
return;
};
if self.downstreams.is_empty() {
return;
}
let mut min_target: Option<Target> = None;
let mut total_hashrate: Hashrate = 0.0;
for downstream in self.downstreams.iter() {
let downstream = downstream.value();
downstream.downstream_data.super_safe_lock(|d| {
let target = *d.pending_target.as_ref().unwrap_or(&d.target);
let hashrate = d
.pending_hashrate
.unwrap_or_else(|| d.hashrate.expect("vardiff implies hashrate"));
min_target = Some(match min_target {
Some(current) => current.min(target),
None => target,
});
total_hashrate += hashrate;
});
}
let min_target = min_target.expect("at least one downstream must exist");
let downstream_count = self.downstreams.len();
let update_channel = UpdateChannel {
channel_id: *channel_id,
nominal_hash_rate: total_hashrate,
maximum_target: min_target.to_le_bytes().into(),
};
debug!(
"Sending aggregated UpdateChannel: channel_id={}, total_hashrate={}, min_target={}, downstreams={}, vardiff_updates={}",
channel_id,
total_hashrate,
min_target,
downstream_count,
all_updates.len()
);
if let Err(e) = self
.sv1_server_channel_state
.channel_manager_sender
.send((Mining::UpdateChannel(update_channel), None))
.await
{
error!("Failed to send aggregated UpdateChannel: {:?}", e);
}
}
async fn send_non_aggregated_update_channels(
&self,
all_updates: Vec<(DownstreamId, ChannelId, Target, Hashrate)>,
) {
for (downstream_id, channel_id, new_target, new_hashrate) in all_updates {
let update_channel = UpdateChannel {
channel_id,
nominal_hash_rate: new_hashrate,
maximum_target: new_target.to_le_bytes().into(),
};
debug!(
"Sending UpdateChannel for downstream {}: channel_id={}, hashrate={}, target={}",
downstream_id, channel_id, new_hashrate, new_target
);
if let Err(e) = self
.sv1_server_channel_state
.channel_manager_sender
.send((Mining::UpdateChannel(update_channel), None))
.await
{
error!(
"Failed to send UpdateChannel for downstream {}: {:?}",
downstream_id, e
);
}
}
}
/// Handles SetTarget messages from the ChannelManager.
///
/// Aggregated mode: Single SetTarget updates all downstreams and processes all pending updates
/// Non-aggregated mode: Each SetTarget updates one specific downstream and processes its
/// pending update
pub async fn handle_set_target_message(&self, set_target: SetTarget<'_>) {
let new_upstream_target =
Target::from_le_bytes(set_target.maximum_target.inner_as_ref().try_into().unwrap());
debug!(
"Received SetTarget for channel {}: new_upstream_target = {}",
set_target.channel_id, new_upstream_target
);
if is_aggregated() {
return self
.handle_aggregated_set_target(new_upstream_target, set_target.channel_id)
.await;
}
self.handle_non_aggregated_set_target(set_target.channel_id, new_upstream_target)
.await;
}
/// Handles SetTarget in aggregated mode.
/// Updates all downstreams and processes all pending set_difficulty messages.
async fn handle_aggregated_set_target(
&self,
new_upstream_target: Target,
channel_id: ChannelId,
) {
debug!("Aggregated mode: Updating upstream target for all downstreams");
for downstream in self.downstreams.iter() {
let downstream = downstream.value();
downstream.downstream_data.super_safe_lock(|d| {
d.set_upstream_target(new_upstream_target, downstream.downstream_id);
});
}
// Process ALL pending difficulty updates that can now be sent downstream
let applicable_updates =
self.get_pending_difficulty_updates(new_upstream_target, None, channel_id);
self.send_pending_set_difficulty_messages_to_downstream(applicable_updates)
.await;
}
/// Handles SetTarget in non-aggregated mode.
/// Updates the specific downstream and processes its pending set_difficulty message.
async fn handle_non_aggregated_set_target(
&self,
channel_id: ChannelId,
new_upstream_target: Target,
) {
debug!(
"Non-aggregated mode: Processing SetTarget for channel {}",
channel_id
);
let Some(downstream_id) = self
.channel_id_to_downstream_id
.super_safe_lock(|map| map.get(&channel_id).cloned())
else {
warn!("No downstream found for channel {}", channel_id);
return;
};
{
let Some(downstream) = self.downstreams.get(&downstream_id) else {
warn!("No downstream found for downstream_id {}", downstream_id);
return;
};
downstream.downstream_data.super_safe_lock(|d| {
d.set_upstream_target(new_upstream_target, downstream_id);
});
}
trace!("Updated upstream target for downstream {}", downstream_id);
let applicable_updates = self.get_pending_difficulty_updates(
new_upstream_target,
Some(downstream_id),
channel_id,
);
self.send_pending_set_difficulty_messages_to_downstream(applicable_updates)
.await;
}
/// Gets pending updates that can now be applied based on the new upstream target.
/// If downstream_id is provided, only returns updates for that specific downstream.
/// Logs a warning if the upstream target is higher than any requested target.
fn get_pending_difficulty_updates(
&self,
new_upstream_target: Target,
downstream_id: Option<DownstreamId>,
channel_id: ChannelId,
) -> Vec<PendingTargetUpdate> {
let mut applicable_updates = Vec::new();
self.pending_target_updates.super_safe_lock(|data| {
data.retain(|pending_update| {
// Check if we should process this update
let should_process = match downstream_id {
Some(downstream_id) => pending_update.downstream_id == downstream_id,
None => true, // Process all in aggregated mode
};
if !should_process {
return true; // keep in pending list (not relevant for this SetTarget)
}
if pending_update.new_target >= new_upstream_target {
// Target is acceptable, can apply immediately
applicable_updates.push(pending_update.clone());
false // remove from pending list
} else {
// WARNING: Upstream gave us a target higher than what we requested
error!(
"❌ Protocol issue: SetTarget response has target ({}) which is higher than requested target ({}) in UpdateChannel for channel {}. Ignoring this pending update for downstream {}.",
new_upstream_target, pending_update.new_target, channel_id, pending_update.downstream_id
);
false // remove from pending list (don't keep invalid requests)
}
});
});
applicable_updates
}
/// Sends set_difficulty messages for all applicable pending updates.
async fn send_pending_set_difficulty_messages_to_downstream(
&self,
difficulty_updates: Vec<PendingTargetUpdate>,
) {
for update in difficulty_updates {
let set_difficulty_msg =
match build_sv1_set_difficulty_from_sv2_target(update.new_target) {
Ok(msg) => msg,
Err(e) => {
error!(
"Failed to build SetDifficulty for downstream {}: {:?}",
update.downstream_id, e
);
continue;
}
};
if let Some(sender) = self
.sv1_server_channel_state
.sv1_server_to_downstream_sender
.super_safe_lock(|downstream| downstream.get(&update.downstream_id).cloned())
{
if let Err(e) = sender.send(set_difficulty_msg).await {
error!(
"Failed to send SetDifficulty to downstream {}: {:?}",
update.downstream_id, e
);
} else {
trace!("Sent SetDifficulty to downstream {}", update.downstream_id);
}
}
}
}
/// Sends an UpdateChannel message for aggregated mode when downstream state changes
/// (e.g., disconnect). Calculates total hashrate and minimum target among all remaining
/// downstreams.
pub async fn send_update_channel_on_downstream_state_change(&self) {
if is_non_aggregated() {
return;
}
let is_empty = self.downstreams.is_empty();
let snapshot = if is_empty {
AggregatedSnapshot::NoDownstreams
} else {
let mut total_hashrate: Hashrate = 0.0;
let mut min_target: Option<Target> = None;
for downstream in self.downstreams.iter() {
let downstream = downstream.value();
downstream.downstream_data.super_safe_lock(|d| {
let hashrate = d.pending_hashrate.unwrap_or_else(|| {
d.hashrate
.expect("vardiff implies downstream must have a hashrate")
});
let target = *d.pending_target.as_ref().unwrap_or(&d.target);
total_hashrate += hashrate;
min_target = Some(match min_target {
Some(current) => current.min(target),
None => target,
});
});
}
AggregatedSnapshot::Active {
total_hashrate,
min_target: min_target.expect("downstreams is non-empty"),
}
};
let update = match snapshot {
AggregatedSnapshot::Active {
total_hashrate,
min_target,
} => UpdateChannel {
channel_id: 0, // ChannelManager will rewrite to upstream extended channel id
nominal_hash_rate: total_hashrate,
maximum_target: min_target.to_le_bytes().into(),
},
AggregatedSnapshot::NoDownstreams => UpdateChannel {
channel_id: 0,
nominal_hash_rate: 0.0,
maximum_target: [0xFF; 32].into(),
},
};
if let Err(e) = self
.sv1_server_channel_state
.channel_manager_sender
.send((Mining::UpdateChannel(update), None))
.await
{
error!(
"Failed to send UpdateChannel after downstream state change: {:?}",
e
);
}
}
}