1use crate::{
2 fuel_core_graphql_api::ports::ConsensusModulePort,
3 service::adapters::{
4 BlockImporterAdapter,
5 BlockProducerAdapter,
6 P2PAdapter,
7 PoAAdapter,
8 TxPoolAdapter,
9 },
10};
11use anyhow::anyhow;
12use fuel_core_importer::ports::{
13 BlockReconciliationWritePort,
14 ImporterDatabase,
15};
16use fuel_core_metrics::poa_metrics::poa_metrics;
17use fuel_core_poa::{
18 ports::{
19 BlockImporter,
20 BlockReconciliationReadPort,
21 LeaderState,
22 P2pPort,
23 PredefinedBlocks,
24 TransactionPool,
25 TransactionsSource,
26 },
27 service::{
28 Mode,
29 SharedState,
30 },
31};
32use fuel_core_services::stream::BoxStream;
33use fuel_core_storage::transactional::Changes;
34use fuel_core_types::{
35 blockchain::{
36 SealedBlock,
37 block::Block,
38 primitives::BlockId,
39 },
40 fuel_types::BlockHeight,
41 services::{
42 block_importer::{
43 BlockImportInfo,
44 UncommittedResult as UncommittedImporterResult,
45 },
46 executor::UncommittedResult,
47 },
48 tai64::Tai64,
49};
50use std::{
51 collections::HashMap,
52 path::{
53 Path,
54 PathBuf,
55 },
56 time::Duration,
57};
58use tokio::{
59 sync::{
60 Mutex,
61 watch,
62 },
63 time::{
64 Instant,
65 sleep,
66 timeout,
67 },
68};
69use tokio_stream::{
70 StreamExt,
71 wrappers::BroadcastStream,
72};
73use tracing::error;
74
75pub mod pre_confirmation_signature;
76
77const CHECK_LEASE_OWNER_SCRIPT: &str = include_str!(concat!(
78 env!("CARGO_MANIFEST_DIR"),
79 "/redis_leader_lease_adapter_scripts/check_lease_owner.lua"
80));
81
82const RELEASE_LOCK_SCRIPT: &str = include_str!(concat!(
83 env!("CARGO_MANIFEST_DIR"),
84 "/redis_leader_lease_adapter_scripts/release_lock.lua"
85));
86
87const PROMOTE_LEADER_SCRIPT: &str = include_str!(concat!(
88 env!("CARGO_MANIFEST_DIR"),
89 "/redis_leader_lease_adapter_scripts/promote_leader.lua"
90));
91
92const WRITE_BLOCK_SCRIPT: &str = include_str!(concat!(
93 env!("CARGO_MANIFEST_DIR"),
94 "/redis_leader_lease_adapter_scripts/write_block.lua"
95));
96
97const READ_STREAM_ENTRIES_SCRIPT: &str = include_str!(concat!(
98 env!("CARGO_MANIFEST_DIR"),
99 "/redis_leader_lease_adapter_scripts/read_stream_entries.lua"
100));
101
102const READ_LATEST_STREAM_ENTRY_SCRIPT: &str = include_str!(concat!(
103 env!("CARGO_MANIFEST_DIR"),
104 "/redis_leader_lease_adapter_scripts/read_latest_stream_entry.lua"
105));
106
107struct RedisNode {
108 redis_client: redis::Client,
109 cached_connection: Mutex<Option<redis::aio::MultiplexedConnection>>,
110}
111
112impl Clone for RedisNode {
113 fn clone(&self) -> Self {
114 Self {
115 redis_client: self.redis_client.clone(),
116 cached_connection: Mutex::new(None),
117 }
118 }
119}
120
121pub struct RedisLeaderLeaseAdapter {
122 redis_nodes: Vec<RedisNode>,
123 quorum: usize,
124 quorum_disruption_budget: u32,
125 lease_key: String,
126 epoch_key: String,
127 block_stream_key: String,
128 lease_owner_token: String,
129 drop_release_guard: std::sync::Arc<()>,
130 current_epoch_token: std::sync::Arc<std::sync::Mutex<Option<u64>>>,
131 lease_ttl_millis: u64,
132 lease_drift_millis: u64,
133 node_timeout: Duration,
134 retry_delay_millis: u64,
135 max_retry_delay_offset_millis: u64,
136 max_attempts: usize,
137 stream_max_len: u32,
138}
139
140impl Clone for RedisLeaderLeaseAdapter {
141 fn clone(&self) -> Self {
142 Self {
143 redis_nodes: self.redis_nodes.clone(),
144 quorum: self.quorum,
145 quorum_disruption_budget: self.quorum_disruption_budget,
146 lease_key: self.lease_key.clone(),
147 epoch_key: self.epoch_key.clone(),
148 block_stream_key: self.block_stream_key.clone(),
149 lease_owner_token: self.lease_owner_token.clone(),
150 drop_release_guard: self.drop_release_guard.clone(),
151 current_epoch_token: self.current_epoch_token.clone(),
152 lease_ttl_millis: self.lease_ttl_millis,
153 lease_drift_millis: self.lease_drift_millis,
154 node_timeout: self.node_timeout,
155 retry_delay_millis: self.retry_delay_millis,
156 max_retry_delay_offset_millis: self.max_retry_delay_offset_millis,
157 max_attempts: self.max_attempts,
158 stream_max_len: self.stream_max_len,
159 }
160 }
161}
162
163#[derive(Default, Clone)]
164pub struct NoopReconciliationAdapter;
165
166#[allow(clippy::large_enum_variant)]
167pub enum ReconciliationAdapter {
168 Redis(RedisLeaderLeaseAdapter),
169 Noop(NoopReconciliationAdapter),
170}
171
172impl RedisLeaderLeaseAdapter {
173 fn calculate_quorum(redis_nodes_len: usize, quorum_disruption_budget: u32) -> usize {
174 let majority = redis_nodes_len
175 .checked_div(2)
176 .unwrap_or(0)
177 .saturating_add(1);
178 let disruption_budget = usize::try_from(quorum_disruption_budget).unwrap_or(0);
179 majority
180 .saturating_add(disruption_budget)
181 .min(redis_nodes_len)
182 }
183
184 #[allow(clippy::too_many_arguments)]
185 pub fn new(
186 redis_urls: Vec<String>,
187 lease_key: String,
188 lease_ttl: Duration,
189 node_timeout: Duration,
190 retry_delay: Duration,
191 max_retry_delay_offset: Duration,
192 max_attempts: u32,
193 stream_max_len: u32,
194 ) -> anyhow::Result<Self> {
195 let redis_nodes = redis_urls
196 .into_iter()
197 .map(|redis_url| {
198 redis::Client::open(redis_url).map(|redis_client| RedisNode {
199 redis_client,
200 cached_connection: Mutex::new(None),
201 })
202 })
203 .collect::<Result<Vec<_>, _>>()?;
204 if redis_nodes.is_empty() {
205 return Err(anyhow!(
206 "At least one redis url is required for leader lock"
207 ));
208 }
209 let quorum_disruption_budget = 0u32;
210 let quorum = Self::calculate_quorum(redis_nodes.len(), quorum_disruption_budget);
211 let lease_ttl_millis = u64::try_from(lease_ttl.as_millis())?;
212 let retry_delay_millis = u64::try_from(retry_delay.as_millis())?;
213 let max_retry_delay_offset_millis =
214 u64::try_from(max_retry_delay_offset.as_millis())?;
215 let max_attempts = usize::try_from(max_attempts)?.max(1);
216 let lease_owner_token = uuid::Uuid::new_v4().to_string();
217 let epoch_key = format!("{lease_key}:epoch:token");
218 let block_stream_key = format!("{lease_key}:block:stream");
219 let lease_drift_millis = lease_ttl_millis
220 .checked_div(100)
221 .unwrap_or(0)
222 .saturating_add(2);
223 Ok(Self {
224 redis_nodes,
225 quorum,
226 quorum_disruption_budget,
227 lease_key,
228 epoch_key,
229 block_stream_key,
230 lease_owner_token,
231 drop_release_guard: std::sync::Arc::new(()),
232 current_epoch_token: std::sync::Arc::new(std::sync::Mutex::new(None)),
233 lease_ttl_millis,
234 lease_drift_millis,
235 node_timeout,
236 retry_delay_millis,
237 max_retry_delay_offset_millis,
238 max_attempts,
239 stream_max_len,
240 })
241 }
242
243 pub fn with_quorum_disruption_budget(
244 mut self,
245 quorum_disruption_budget: u32,
246 ) -> Self {
247 self.quorum_disruption_budget = quorum_disruption_budget;
248 self.quorum =
249 Self::calculate_quorum(self.redis_nodes.len(), quorum_disruption_budget);
250 self
251 }
252
253 async fn multiplexed_connection(
254 &self,
255 redis_node: &RedisNode,
256 ) -> anyhow::Result<redis::aio::MultiplexedConnection> {
257 if let Some(connection) =
258 redis_node.cached_connection.lock().await.as_ref().cloned()
259 {
260 return Ok(connection);
261 }
262
263 let new_connection = timeout(
264 self.node_timeout,
265 redis_node.redis_client.get_multiplexed_async_connection(),
266 )
267 .await
268 .map_err(|_| anyhow!("Timed out while connecting to redis leader-lock node"))??;
269 let mut cached_connection = redis_node.cached_connection.lock().await;
270 if let Some(connection) = cached_connection.as_ref().cloned() {
271 return Ok(connection);
272 }
273 *cached_connection = Some(new_connection.clone());
274 Ok(new_connection)
275 }
276
277 async fn clear_cached_connection(&self, redis_node: &RedisNode) {
278 let mut cached_connection = redis_node.cached_connection.lock().await;
279 *cached_connection = None;
280 poa_metrics().connection_reset_total.inc();
281 }
282
283 async fn check_lease_owner_on_node(&self, redis_node: &RedisNode) -> bool {
284 let mut connection = match self.multiplexed_connection(redis_node).await {
285 Ok(connection) => connection,
286 Err(_) => return false,
287 };
288 let is_owner = timeout(
289 self.node_timeout,
290 redis::Script::new(CHECK_LEASE_OWNER_SCRIPT)
291 .key(&self.lease_key)
292 .arg(&self.lease_owner_token)
293 .invoke_async::<i32>(&mut connection),
294 )
295 .await;
296 match is_owner {
297 Ok(Ok(is_owner)) => is_owner == 1,
298 Err(_) => {
299 self.clear_cached_connection(redis_node).await;
300 false
301 }
302 Ok(Err(_)) => {
303 self.clear_cached_connection(redis_node).await;
304 false
305 }
306 }
307 }
308
309 async fn promote_leader_on_node(
310 &self,
311 redis_node: &RedisNode,
312 ) -> anyhow::Result<Option<u64>> {
313 let mut connection = match self.multiplexed_connection(redis_node).await {
314 Ok(connection) => connection,
315 Err(_) => return Ok(None),
316 };
317 let promoted = timeout(
318 self.node_timeout,
319 redis::Script::new(PROMOTE_LEADER_SCRIPT)
320 .key(&self.lease_key)
321 .key(&self.epoch_key)
322 .arg(&self.lease_owner_token)
323 .arg(self.lease_ttl_millis)
324 .invoke_async::<u64>(&mut connection),
325 )
326 .await;
327 match promoted {
328 Ok(Ok(token)) => Ok(Some(token)),
329 Ok(Err(err)) => {
330 if err.to_string().contains("LOCK_HELD:") {
331 return Ok(None);
332 }
333 self.clear_cached_connection(redis_node).await;
334 Ok(None)
335 }
336 Err(_) => {
337 self.clear_cached_connection(redis_node).await;
338 Ok(None)
339 }
340 }
341 }
342
343 async fn release_lease_on_node(&self, redis_node: &RedisNode) -> bool {
344 let mut connection = match self.multiplexed_connection(redis_node).await {
345 Ok(connection) => connection,
346 Err(_) => return false,
347 };
348 let released = timeout(
349 self.node_timeout,
350 redis::Script::new(RELEASE_LOCK_SCRIPT)
351 .key(&self.lease_key)
352 .arg(&self.lease_owner_token)
353 .invoke_async::<i32>(&mut connection),
354 )
355 .await;
356 match released {
357 Ok(Ok(released)) => released == 1,
358 Err(_) => {
359 self.clear_cached_connection(redis_node).await;
360 false
361 }
362 Ok(Err(_)) => {
363 self.clear_cached_connection(redis_node).await;
364 false
365 }
366 }
367 }
368
369 fn quorum_reached(&self, success_count: usize) -> bool {
370 success_count >= self.quorum
371 }
372
373 fn calculate_remaining_validity_millis(&self, elapsed_millis: u64) -> u64 {
374 self.lease_ttl_millis
375 .saturating_sub(elapsed_millis.saturating_add(self.lease_drift_millis))
376 }
377
378 fn random_retry_delay_offset_millis(&self) -> u64 {
379 if self.max_retry_delay_offset_millis == 0 {
380 return 0;
381 }
382 rand::random::<u64>()
383 .checked_rem(self.max_retry_delay_offset_millis.saturating_add(1))
384 .unwrap_or(0)
385 }
386
387 async fn release_lease_on_all_nodes(&self) {
388 let _ = futures::future::join_all(
389 self.redis_nodes
390 .iter()
391 .map(|redis_node| self.release_lease_on_node(redis_node)),
392 )
393 .await;
394 }
395
396 async fn delay_next_retry(&self) {
397 let retry_delay_millis = self
398 .retry_delay_millis
399 .saturating_add(self.random_retry_delay_offset_millis());
400 sleep(Duration::from_millis(retry_delay_millis)).await;
401 }
402
403 async fn has_lease_owner_quorum(&self) -> anyhow::Result<bool> {
404 let ownership = futures::future::join_all(
405 self.redis_nodes
406 .iter()
407 .map(|redis_node| self.check_lease_owner_on_node(redis_node)),
408 )
409 .await;
410 let owner_count = ownership.iter().filter(|&&is_owner| is_owner).count();
411 if !self.quorum_reached(owner_count) {
412 tracing::info!(
413 owner_count,
414 quorum = self.quorum,
415 redis_nodes = self.redis_nodes.len(),
416 current_epoch_token = ?self.current_epoch_token_value(),
417 lease_key = %self.lease_key,
418 "Leader lock quorum not held by this authority"
419 );
420 return Ok(false);
421 }
422
423 let non_owned: Vec<&RedisNode> = self
430 .redis_nodes
431 .iter()
432 .zip(ownership.iter())
433 .filter(|(_, is_owner)| !**is_owner)
434 .map(|(node, _)| node)
435 .collect();
436
437 if !non_owned.is_empty() {
438 let results = futures::future::join_all(
439 non_owned
440 .into_iter()
441 .map(|redis_node| self.promote_leader_on_node(redis_node)),
442 )
443 .await;
444
445 if let Some(max_new) =
446 results.into_iter().filter_map(|r| r.ok().flatten()).max()
447 && let Ok(mut epoch) = self.current_epoch_token.lock()
448 {
449 let current = epoch.unwrap_or(0);
450 if max_new > current {
451 tracing::debug!(
452 old_epoch = current,
453 new_epoch = max_new,
454 "Adopted higher epoch from lock expansion"
455 );
456 *epoch = Some(max_new);
457 }
458 }
459 }
460
461 Ok(true)
462 }
463
464 async fn acquire_lease_if_free(&self) -> anyhow::Result<bool> {
465 let promotion_start = std::time::Instant::now();
466 for attempt_index in 0..self.max_attempts {
467 let start = std::time::Instant::now();
468 let promoted_nodes = futures::future::join_all(
469 self.redis_nodes
470 .iter()
471 .map(|redis_node| self.promote_leader_on_node(redis_node)),
472 )
473 .await;
474 let promoted_tokens = promoted_nodes
475 .into_iter()
476 .filter_map(|token| token.ok().flatten())
477 .collect::<Vec<_>>();
478 let acquired_count = promoted_tokens.len();
479 let elapsed_millis =
480 u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
481 let validity_millis =
482 self.calculate_remaining_validity_millis(elapsed_millis);
483 tracing::debug!(
484 attempt = attempt_index.saturating_add(1),
485 max_attempts = self.max_attempts,
486 acquired_count,
487 quorum = self.quorum,
488 redis_nodes = self.redis_nodes.len(),
489 elapsed_millis,
490 validity_millis,
491 promoted_tokens = ?promoted_tokens,
492 current_epoch_token = ?self.current_epoch_token_value(),
493 lease_key = %self.lease_key,
494 "Leader lock promotion attempt finished"
495 );
496 if self.quorum_reached(acquired_count) && validity_millis > 0 {
497 if promoted_tokens.len() > 1
499 && let (Some(min_tok), Some(max_tok)) = (
500 promoted_tokens.iter().copied().min(),
501 promoted_tokens.iter().copied().max(),
502 )
503 {
504 poa_metrics().epoch_max_drift.set(
505 i64::try_from(max_tok.saturating_sub(min_tok))
506 .unwrap_or(i64::MAX),
507 );
508 }
509 if let Some(max_token) = promoted_tokens.into_iter().max() {
510 let mut current_epoch_token = self
511 .current_epoch_token
512 .lock()
513 .map_err(|e| anyhow!("epoch token lock poisoned: {}", e))?;
514 *current_epoch_token = Some(max_token);
515 poa_metrics()
516 .leader_epoch
517 .set(i64::try_from(max_token).unwrap_or(i64::MAX));
518 }
519 poa_metrics().promotion_success_total.inc();
520 poa_metrics()
521 .promotion_duration_s
522 .observe(promotion_start.elapsed().as_secs_f64());
523 return Ok(true);
524 }
525 self.release_lease_on_all_nodes().await;
526 let is_last_attempt = attempt_index.saturating_add(1) == self.max_attempts;
527 if !is_last_attempt {
528 self.delay_next_retry().await;
529 }
530 }
531 tracing::warn!(
532 max_attempts = self.max_attempts,
533 quorum = self.quorum,
534 redis_nodes = self.redis_nodes.len(),
535 current_epoch_token = ?self.current_epoch_token_value(),
536 lease_key = %self.lease_key,
537 "Leader lock promotion failed; cannot produce block"
538 );
539 poa_metrics().promotion_failure_total.inc();
540 poa_metrics()
541 .promotion_duration_s
542 .observe(promotion_start.elapsed().as_secs_f64());
543 Ok(false)
544 }
545
546 async fn release_lease_on_client(
547 redis_client: redis::Client,
548 lease_key: String,
549 lease_owner_token: String,
550 node_timeout: Duration,
551 ) {
552 let connection = timeout(
553 node_timeout,
554 redis_client.get_multiplexed_async_connection(),
555 )
556 .await;
557 let mut connection = match connection {
558 Ok(Ok(connection)) => connection,
559 Err(_) => return,
560 Ok(Err(_)) => return,
561 };
562 let _ = timeout(
563 node_timeout,
564 redis::Script::new(RELEASE_LOCK_SCRIPT)
565 .key(lease_key)
566 .arg(lease_owner_token)
567 .invoke_async::<i32>(&mut connection),
568 )
569 .await;
570 }
571
572 async fn release_lease_on_clients(
573 redis_clients: Vec<redis::Client>,
574 lease_key: String,
575 lease_owner_token: String,
576 node_timeout: Duration,
577 ) {
578 let _ =
579 futures::future::join_all(redis_clients.into_iter().map(|redis_client| {
580 Self::release_lease_on_client(
581 redis_client,
582 lease_key.clone(),
583 lease_owner_token.clone(),
584 node_timeout,
585 )
586 }))
587 .await;
588 }
589
590 fn release_lease_on_clients_sync(
591 redis_clients: Vec<redis::Client>,
592 lease_key: String,
593 lease_owner_token: String,
594 ) {
595 redis_clients.into_iter().for_each(|redis_client| {
596 let Ok(mut connection) = redis_client.get_connection() else {
597 return;
598 };
599 let _ = redis::Script::new(RELEASE_LOCK_SCRIPT)
600 .key(&lease_key)
601 .arg(&lease_owner_token)
602 .invoke::<i32>(&mut connection);
603 });
604 }
605
606 async fn read_latest_stream_entry_on_node(
607 &self,
608 redis_node: &RedisNode,
609 ) -> anyhow::Result<Option<(u32, String)>> {
610 let mut connection = self.multiplexed_connection(redis_node).await?;
611 let latest_entry = timeout(
612 self.node_timeout,
613 redis::Script::new(READ_LATEST_STREAM_ENTRY_SCRIPT)
614 .key(&self.block_stream_key)
615 .invoke_async::<Vec<String>>(&mut connection),
616 )
617 .await;
618 match latest_entry {
619 Err(_) => {
620 self.clear_cached_connection(redis_node).await;
621 Err(anyhow!(
622 "Timed out reading latest stream entry from Redis node"
623 ))
624 }
625 Ok(Err(e)) => {
626 self.clear_cached_connection(redis_node).await;
627 Err(anyhow!(
628 "Failed to read latest stream entry from Redis node: {e}"
629 ))
630 }
631 Ok(Ok(entry)) => {
632 if entry.len() != 2 {
633 return Ok(None);
634 }
635 let height = entry[0]
636 .parse::<u32>()
637 .map_err(|e| anyhow!("Invalid latest stream entry height: {e}"))?;
638 Ok(Some((height, entry[1].clone())))
639 }
640 }
641 }
642
643 async fn should_reconcile_from_stream(
644 &self,
645 next_height: BlockHeight,
646 ) -> anyhow::Result<bool> {
647 let next_height = u32::from(next_height);
648 let latest_results = futures::future::join_all(
649 self.redis_nodes
650 .iter()
651 .map(|redis_node| self.read_latest_stream_entry_on_node(redis_node)),
652 )
653 .await;
654 let mut successful_reads = 0usize;
655 let mut failed_count = 0usize;
656 let mut nodes_indicating_backlog = 0usize;
657 for result in latest_results {
658 match result {
659 Ok(Some((latest_height, _latest_stream_id))) => {
660 successful_reads = successful_reads.saturating_add(1);
661 if latest_height >= next_height {
662 nodes_indicating_backlog =
663 nodes_indicating_backlog.saturating_add(1);
664 }
665 }
666 Ok(None) => {
667 successful_reads = successful_reads.saturating_add(1);
668 }
669 Err(e) => {
670 tracing::warn!("Redis latest stream read failed: {e}");
671 failed_count = failed_count.saturating_add(1);
672 }
673 }
674 }
675 if !self.quorum_reached(successful_reads) {
676 return Err(anyhow!(
677 "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
678 successful_reads,
679 self.redis_nodes.len(),
680 failed_count
681 ));
682 }
683 Ok(nodes_indicating_backlog > 0)
684 }
685
686 async fn read_stream_entries_on_node(
687 &self,
688 redis_node: &RedisNode,
689 next_height: u32,
690 max_entries: usize,
691 ) -> anyhow::Result<Vec<(u32, u64, SealedBlock)>> {
692 if max_entries == 0 {
693 return Ok(Vec::new());
694 }
695
696 let mut connection = self.multiplexed_connection(redis_node).await?;
697 let count = u32::try_from(max_entries).unwrap_or(u32::MAX);
698 let stream_entries = timeout(
699 self.node_timeout,
700 redis::Script::new(READ_STREAM_ENTRIES_SCRIPT)
701 .key(&self.block_stream_key)
702 .arg(next_height)
703 .arg(count)
704 .invoke_async::<Vec<(u32, u64, Vec<u8>, String)>>(&mut connection),
705 )
706 .await;
707
708 let entries = match stream_entries {
709 Err(_) => {
710 self.clear_cached_connection(redis_node).await;
711 return Err(anyhow!("Timed out reading stream entries from Redis node"));
712 }
713 Ok(Err(e)) => {
714 self.clear_cached_connection(redis_node).await;
715 return Err(anyhow!(
716 "Failed to read stream entries from Redis node: {e}"
717 ));
718 }
719 Ok(Ok(entries)) => entries,
720 };
721
722 let mut blocks = Vec::new();
723 for (height, epoch, bytes, _stream_id) in entries {
724 match postcard::from_bytes::<SealedBlock>(&bytes) {
725 Ok(block) => blocks.push((height, epoch, block)),
726 Err(e) => {
727 tracing::warn!(
728 "Skipping stream entry: failed to deserialize block at height {height}: {e}"
729 );
730 }
731 }
732 }
733
734 Ok(blocks)
735 }
736
737 async fn unreconciled_blocks(
738 &self,
739 next_height: BlockHeight,
740 ) -> anyhow::Result<Vec<SealedBlock>> {
741 if !self.should_reconcile_from_stream(next_height).await? {
742 return Ok(Vec::new());
743 }
744 let mut reconciled = Vec::new();
745 let max_reconcile_blocks_per_round =
746 usize::try_from(self.stream_max_len).unwrap_or(usize::MAX);
747 let next_height_u32 = u32::from(next_height);
748 let read_results =
749 futures::future::join_all(self.redis_nodes.iter().map(|redis_node| {
750 self.read_stream_entries_on_node(
751 redis_node,
752 next_height_u32,
753 max_reconcile_blocks_per_round,
754 )
755 }))
756 .await;
757
758 let mut successful_reads = Vec::new();
759 let mut failed_count = 0usize;
760 for result in read_results {
761 match result {
762 Ok(entries) => successful_reads.push(entries),
763 Err(e) => {
764 tracing::warn!("Redis stream read failed: {e}");
765 failed_count = failed_count.saturating_add(1);
766 }
767 }
768 }
769
770 if !self.quorum_reached(successful_reads.len()) {
771 return Err(anyhow!(
772 "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
773 successful_reads.len(),
774 self.redis_nodes.len(),
775 failed_count
776 ));
777 }
778
779 let blocks_by_node = successful_reads
780 .into_iter()
781 .map(|entries| {
782 entries.into_iter().fold(
783 HashMap::<u32, HashMap<u64, SealedBlock>>::new(),
784 |mut blocks_by_height, (height, epoch, block)| {
785 blocks_by_height
786 .entry(height)
787 .or_default()
788 .insert(epoch, block);
789 blocks_by_height
790 },
791 )
792 })
793 .collect::<Vec<_>>();
794
795 let min_stream_height = blocks_by_node
797 .iter()
798 .flat_map(|blocks_by_height| blocks_by_height.keys().copied())
799 .min();
800 if let Some(min_h) = min_stream_height {
801 let local_committed = i64::from(u32::from(next_height).saturating_sub(1));
802 let headroom = i64::from(min_h).saturating_sub(local_committed);
803 poa_metrics().stream_trim_headroom.set(headroom);
804 }
805
806 let mut current_height = u32::from(next_height);
807
808 for _ in 0..max_reconcile_blocks_per_round {
809 let nodes_with_height = blocks_by_node
810 .iter()
811 .filter(|blocks_by_height| blocks_by_height.contains_key(¤t_height))
812 .count();
813
814 tracing::debug!(
815 "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}",
816 blocks_by_node.len()
817 );
818
819 if nodes_with_height == 0 {
820 if reconciled.is_empty() {
821 return Err(anyhow!(
822 "Backlog unresolved at height {current_height}: \
823 stream indicates backlog but no entries found at next height"
824 ));
825 }
826 break;
827 }
828
829 let votes = blocks_by_node
836 .iter()
837 .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height))
838 .flat_map(|blocks_by_epoch| blocks_by_epoch.iter())
839 .fold(
840 HashMap::<BlockId, (u64, usize, SealedBlock)>::new(),
841 |mut votes, (epoch, block)| {
842 let vote_key = block.entity.id();
843 match votes.get_mut(&vote_key) {
844 Some((max_epoch, count, _)) => {
845 *count = count.saturating_add(1);
846 if *epoch > *max_epoch {
847 *max_epoch = *epoch;
848 }
849 }
850 None => {
851 votes.insert(vote_key, (*epoch, 1, block.clone()));
852 }
853 }
854 votes
855 },
856 );
857
858 let winner = votes
859 .into_iter()
860 .max_by_key(|(_, (max_epoch, _, _))| *max_epoch)
861 .map(|(_, (_, count, block))| (count, block));
862
863 if let Some((count, block)) = winner {
864 if self.quorum_reached(count) {
865 reconciled.push(block);
867 } else {
868 tracing::info!(
873 "Repairing sub-quorum block at height {current_height} \
874 (found on {count}/{} nodes)",
875 blocks_by_node.len()
876 );
877 match self.repair_sub_quorum_block(&block, count) {
878 Ok(true) => {
879 tracing::info!(
880 "Repair succeeded — block at height {current_height} \
881 now has quorum"
882 );
883 reconciled.push(block);
884 }
885 Ok(false) => {
886 tracing::warn!(
887 "Repair failed to reach quorum at height \
888 {current_height} — will retry next round"
889 );
890 if reconciled.is_empty() {
891 return Err(anyhow!(
892 "Backlog unresolved at height {current_height}: \
893 repair failed to reach quorum"
894 ));
895 }
896 break;
897 }
898 Err(e) => {
899 tracing::warn!(
900 "Repair error at height {current_height}: {e}"
901 );
902 if reconciled.is_empty() {
903 return Err(anyhow!(
904 "Backlog unresolved at height {current_height}: \
905 repair error: {e}"
906 ));
907 }
908 break;
909 }
910 }
911 }
912 } else {
913 if reconciled.is_empty() {
914 return Err(anyhow!(
915 "Backlog unresolved at height {current_height}: \
916 no winning block candidate"
917 ));
918 }
919 break;
920 }
921
922 let Some(next) = current_height.checked_add(1) else {
923 break;
924 };
925 current_height = next;
926 }
927
928 Ok(reconciled)
929 }
930
931 async fn can_produce_block(&self) -> anyhow::Result<bool> {
932 tracing::debug!("Checking Redis leader lock");
933 if self.has_lease_owner_quorum().await? {
934 tracing::debug!(
935 quorum = self.quorum,
936 redis_nodes = self.redis_nodes.len(),
937 current_epoch_token = ?self.current_epoch_token_value(),
938 lease_key = %self.lease_key,
939 "This authority already holds leader lock quorum"
940 );
941 return Ok(true);
942 }
943 self.acquire_lease_if_free().await
944 }
945
946 async fn release_if_owner(&self) -> anyhow::Result<()> {
947 tracing::debug!("Releasing Redis leader lock");
948 if !self.has_lease_owner_quorum().await? {
949 let mut current_epoch_token = self
950 .current_epoch_token
951 .lock()
952 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
953 *current_epoch_token = None;
954 return Ok(());
955 }
956
957 let releases = futures::future::join_all(
958 self.redis_nodes
959 .iter()
960 .map(|redis_node| self.release_lease_on_node(redis_node)),
961 )
962 .await;
963 let released_count = releases.into_iter().filter(|released| *released).count();
964 if self.quorum_reached(released_count) {
965 let mut current_epoch_token = self
966 .current_epoch_token
967 .lock()
968 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
969 *current_epoch_token = None;
970 Ok(())
971 } else {
972 Err(anyhow!("Failed to release lease on quorum"))
973 }
974 }
975
976 fn current_epoch_token_value(&self) -> Option<u64> {
977 self.current_epoch_token
978 .lock()
979 .ok()
980 .and_then(|epoch| *epoch)
981 }
982
983 fn publish_block_on_all_nodes(
984 &self,
985 epoch: u64,
986 block: &SealedBlock,
987 block_data: &[u8],
988 ) -> Vec<anyhow::Result<WriteBlockResult>> {
989 let n = self.redis_nodes.len();
1006 let block_data: std::sync::Arc<[u8]> = block_data.into();
1007 let block_height = u32::from(*block.entity.header().height());
1008 let block_stream_key: std::sync::Arc<str> = self.block_stream_key.as_str().into();
1009 let epoch_key: std::sync::Arc<str> = self.epoch_key.as_str().into();
1010 let lease_key: std::sync::Arc<str> = self.lease_key.as_str().into();
1011 let lease_owner_token: std::sync::Arc<str> =
1012 self.lease_owner_token.as_str().into();
1013 let lease_ttl_millis = self.lease_ttl_millis;
1014 let stream_max_len = self.stream_max_len;
1015 let node_timeout = self.node_timeout;
1016
1017 let (tx, rx) =
1018 std::sync::mpsc::channel::<(usize, anyhow::Result<WriteBlockResult>)>();
1019
1020 for (idx, redis_node) in self.redis_nodes.iter().enumerate() {
1021 let client = redis_node.redis_client.clone();
1022 let tx = tx.clone();
1023 let block_data = block_data.clone();
1024 let block_stream_key = block_stream_key.clone();
1025 let epoch_key = epoch_key.clone();
1026 let lease_key = lease_key.clone();
1027 let lease_owner_token = lease_owner_token.clone();
1028 std::thread::spawn(move || {
1029 let result = Self::invoke_write_block_script(
1030 &client,
1031 node_timeout,
1032 &block_stream_key,
1033 &epoch_key,
1034 &lease_key,
1035 epoch,
1036 &lease_owner_token,
1037 block_height,
1038 &block_data,
1039 lease_ttl_millis,
1040 stream_max_len,
1041 );
1042 let _ = tx.send((idx, result));
1045 });
1046 }
1047 drop(tx);
1050
1051 let mut results: Vec<Option<anyhow::Result<WriteBlockResult>>> =
1052 (0..n).map(|_| None).collect();
1053 let mut written_count = 0usize;
1054 let mut received = 0usize;
1055
1056 while received < n {
1057 let Ok((idx, result)) = rx.recv() else {
1058 break;
1061 };
1062 if matches!(result, Ok(WriteBlockResult::Written)) {
1063 written_count = written_count.saturating_add(1);
1064 }
1065 results[idx] = Some(result);
1066 received = received.saturating_add(1);
1067
1068 if self.quorum_reached(written_count) {
1069 break;
1073 }
1074 }
1075
1076 results
1077 .into_iter()
1078 .map(|r| {
1079 r.unwrap_or_else(|| {
1080 Err(anyhow!(
1081 "publish abandoned: quorum reached before this \
1082 node responded"
1083 ))
1084 })
1085 })
1086 .collect()
1087 }
1088
1089 #[allow(clippy::too_many_arguments)]
1093 fn invoke_write_block_script(
1094 redis_client: &redis::Client,
1095 node_timeout: Duration,
1096 block_stream_key: &str,
1097 epoch_key: &str,
1098 lease_key: &str,
1099 epoch: u64,
1100 lease_owner_token: &str,
1101 block_height: u32,
1102 block_data: &[u8],
1103 lease_ttl_millis: u64,
1104 stream_max_len: u32,
1105 ) -> anyhow::Result<WriteBlockResult> {
1106 let mut connection = redis_client.get_connection_with_timeout(node_timeout)?;
1107 connection.set_read_timeout(Some(node_timeout))?;
1108 connection.set_write_timeout(Some(node_timeout))?;
1109 let lua_start = std::time::Instant::now();
1110 let write_result = redis::Script::new(WRITE_BLOCK_SCRIPT)
1111 .key(block_stream_key)
1112 .key(epoch_key)
1113 .key(lease_key)
1114 .arg(epoch)
1115 .arg(lease_owner_token)
1116 .arg(block_height)
1117 .arg(block_data)
1118 .arg(lease_ttl_millis)
1119 .arg(stream_max_len)
1120 .invoke::<String>(&mut connection);
1121 poa_metrics()
1122 .write_block_duration_s
1123 .observe(lua_start.elapsed().as_secs_f64());
1124 match write_result {
1125 Ok(_) => {
1126 poa_metrics().write_block_success_total.inc();
1127 Ok(WriteBlockResult::Written)
1128 }
1129 Err(err) if err.to_string().contains("HEIGHT_EXISTS:") => {
1130 poa_metrics().write_block_height_exists_total.inc();
1131 tracing::debug!(
1132 "write_block: height already exists (height={block_height})"
1133 );
1134 Ok(WriteBlockResult::HeightExists)
1135 }
1136 Err(err) if err.to_string().contains("FENCING_ERROR:") => {
1137 poa_metrics().write_block_fencing_error_total.inc();
1138 tracing::warn!(
1139 "write_block: fencing rejected (height={block_height}): {err}"
1140 );
1141 Ok(WriteBlockResult::FencingRejected)
1142 }
1143 Err(err) => {
1144 poa_metrics().write_block_error_total.inc();
1145 Err(err.into())
1146 }
1147 }
1148 }
1149
1150 fn repair_sub_quorum_block(
1165 &self,
1166 block: &SealedBlock,
1167 pre_existing_count: usize,
1168 ) -> anyhow::Result<bool> {
1169 let epoch = match *self
1170 .current_epoch_token
1171 .lock()
1172 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1173 {
1174 Some(epoch) => epoch,
1175 None => {
1176 return Err(anyhow!(
1177 "Cannot repair block because fencing token is not initialized"
1178 ));
1179 }
1180 };
1181 let block_data = postcard::to_allocvec(block)?;
1182 let mut total_with_block = pre_existing_count;
1188 for result in self.publish_block_on_all_nodes(epoch, block, &block_data) {
1189 match result {
1190 Ok(WriteBlockResult::Written) => {
1191 total_with_block = total_with_block.saturating_add(1);
1192 }
1193 Ok(WriteBlockResult::HeightExists) => {
1194 }
1198 Ok(WriteBlockResult::FencingRejected) => {
1199 return Err(anyhow!(
1201 "Lost lock during repair — another leader took over"
1202 ));
1203 }
1204 Err(err) => {
1205 tracing::debug!("Repair write to node failed: {err}");
1206 }
1207 }
1208 }
1209 let reached_quorum = self.quorum_reached(total_with_block);
1210 if reached_quorum {
1211 poa_metrics().repair_success_total.inc();
1212 } else {
1213 poa_metrics().repair_failure_total.inc();
1214 }
1215 Ok(reached_quorum)
1216 }
1217}
1218
1219enum WriteBlockResult {
1221 Written,
1223 HeightExists,
1225 FencingRejected,
1227}
1228
1229impl PoAAdapter {
1230 pub fn new(shared_state: Option<SharedState>) -> Self {
1231 Self { shared_state }
1232 }
1233
1234 pub async fn manually_produce_blocks(
1235 &self,
1236 start_time: Option<Tai64>,
1237 mode: Mode,
1238 ) -> anyhow::Result<()> {
1239 self.shared_state
1240 .as_ref()
1241 .ok_or(anyhow!("The block production is disabled"))?
1242 .manually_produce_block(start_time, mode)
1243 .await
1244 }
1245}
1246
1247#[async_trait::async_trait]
1248impl BlockReconciliationReadPort for NoopReconciliationAdapter {
1249 async fn leader_state(
1250 &self,
1251 _next_height: BlockHeight,
1252 ) -> anyhow::Result<LeaderState> {
1253 Ok(LeaderState::ReconciledLeader)
1254 }
1255
1256 async fn release(&self) -> anyhow::Result<()> {
1257 Ok(())
1258 }
1259}
1260
1261#[async_trait::async_trait]
1262impl BlockReconciliationReadPort for RedisLeaderLeaseAdapter {
1263 async fn leader_state(
1264 &self,
1265 next_height: BlockHeight,
1266 ) -> anyhow::Result<LeaderState> {
1267 if self.can_produce_block().await? {
1268 poa_metrics().is_leader.set(1);
1269 if let Ok(epoch) = self.current_epoch_token.lock()
1270 && let Some(epoch) = *epoch
1271 {
1272 poa_metrics()
1273 .leader_epoch
1274 .set(i64::try_from(epoch).unwrap_or(i64::MAX));
1275 }
1276 let reconcile_start = std::time::Instant::now();
1277 let unreconciled_blocks = self.unreconciled_blocks(next_height).await?;
1278 poa_metrics()
1279 .reconciliation_duration_s
1280 .observe(reconcile_start.elapsed().as_secs_f64());
1281 if unreconciled_blocks.is_empty() {
1282 Ok(LeaderState::ReconciledLeader)
1283 } else {
1284 Ok(LeaderState::UnreconciledBlocks(unreconciled_blocks))
1285 }
1286 } else {
1287 poa_metrics().is_leader.set(0);
1288 Ok(LeaderState::ReconciledFollower)
1289 }
1290 }
1291
1292 async fn release(&self) -> anyhow::Result<()> {
1293 self.release_if_owner().await
1294 }
1295}
1296
1297#[async_trait::async_trait]
1298impl BlockReconciliationReadPort for ReconciliationAdapter {
1299 async fn leader_state(
1300 &self,
1301 next_height: BlockHeight,
1302 ) -> anyhow::Result<LeaderState> {
1303 match self {
1304 Self::Redis(adapter) => adapter.leader_state(next_height).await,
1305 Self::Noop(adapter) => adapter.leader_state(next_height).await,
1306 }
1307 }
1308
1309 async fn release(&self) -> anyhow::Result<()> {
1310 match self {
1311 Self::Redis(adapter) => adapter.release().await,
1312 Self::Noop(adapter) => adapter.release().await,
1313 }
1314 }
1315}
1316
1317impl Drop for RedisLeaderLeaseAdapter {
1318 fn drop(&mut self) {
1319 if std::sync::Arc::strong_count(&self.drop_release_guard) != 1 {
1320 return;
1321 }
1322
1323 let redis_clients = self
1324 .redis_nodes
1325 .iter()
1326 .map(|redis_node| redis_node.redis_client.clone())
1327 .collect::<Vec<_>>();
1328 if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
1329 let release_future = timeout(
1330 Duration::from_millis(100),
1331 Self::release_lease_on_clients(
1332 redis_clients,
1333 self.lease_key.clone(),
1334 self.lease_owner_token.clone(),
1335 self.node_timeout,
1336 ),
1337 );
1338 drop(runtime_handle.spawn(async move {
1339 if release_future.await.is_err() {
1340 error!("Failed to release leader lease: timeout");
1341 }
1342 }));
1343 return;
1344 }
1345
1346 Self::release_lease_on_clients_sync(
1347 redis_clients,
1348 self.lease_key.clone(),
1349 self.lease_owner_token.clone(),
1350 );
1351 }
1352}
1353
1354impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter {
1355 fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
1356 let epoch = match *self
1357 .current_epoch_token
1358 .lock()
1359 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1360 {
1361 Some(epoch) => epoch,
1362 None => {
1363 if matches!(
1364 block.consensus,
1365 fuel_core_types::blockchain::consensus::Consensus::Genesis(_)
1366 ) {
1367 tracing::debug!(
1368 "Skipping redis block publish for genesis block because fencing token is not initialized"
1369 );
1370 return Ok(());
1371 }
1372 return Err(anyhow!(
1373 "Cannot publish block because fencing token is not initialized"
1374 ));
1375 }
1376 };
1377 let block_data = postcard::to_allocvec(block)?;
1378 let successes = self
1379 .publish_block_on_all_nodes(epoch, block, &block_data)
1380 .into_iter()
1381 .map(|result| match result {
1382 Ok(WriteBlockResult::Written) => true,
1383 Ok(_) => false,
1384 Err(err) => {
1385 tracing::debug!("Redis publish on node failed: {err}");
1386 false
1387 }
1388 })
1389 .filter(|success| *success)
1390 .count();
1391 if self.quorum_reached(successes) {
1392 Ok(())
1393 } else {
1394 Err(anyhow!(
1395 "Failed to publish block to redis quorum with fencing checks"
1396 ))
1397 }
1398 }
1399}
1400
1401#[async_trait::async_trait]
1402impl ConsensusModulePort for PoAAdapter {
1403 async fn manually_produce_blocks(
1404 &self,
1405 start_time: Option<Tai64>,
1406 number_of_blocks: u32,
1407 ) -> anyhow::Result<()> {
1408 self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
1409 .await
1410 }
1411}
1412
1413#[cfg(feature = "p2p")]
1414impl P2pPort for P2PAdapter {
1415 fn reserved_peers_count(&self) -> BoxStream<usize> {
1416 if let Some(service) = &self.service {
1417 Box::pin(
1418 BroadcastStream::new(service.subscribe_reserved_peers_count())
1419 .filter_map(|result| result.ok()),
1420 )
1421 } else {
1422 Box::pin(tokio_stream::pending())
1423 }
1424 }
1425}
1426
1427#[cfg(not(feature = "p2p"))]
1428impl P2pPort for P2PAdapter {
1429 fn reserved_peers_count(&self) -> BoxStream<usize> {
1430 Box::pin(tokio_stream::pending())
1431 }
1432}
1433
1434pub struct InDirectoryPredefinedBlocks {
1435 path_to_directory: Option<PathBuf>,
1436}
1437
1438impl InDirectoryPredefinedBlocks {
1439 pub fn new(path_to_directory: Option<PathBuf>) -> Self {
1440 Self { path_to_directory }
1441 }
1442}
1443
1444impl PredefinedBlocks for InDirectoryPredefinedBlocks {
1445 fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
1446 let Some(path) = &self.path_to_directory else {
1447 return Ok(None);
1448 };
1449
1450 let block_height: u32 = (*height).into();
1451 if block_exists(path.as_path(), block_height) {
1452 let block_path = block_path(path.as_path(), block_height);
1453 let block_bytes = std::fs::read(block_path)?;
1454 let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
1455 Ok(Some(block))
1456 } else {
1457 Ok(None)
1458 }
1459 }
1460}
1461
1462pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
1463 path_to_directory.join(format!("{block_height}.json"))
1464}
1465
1466pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
1467 block_path(path_to_directory, block_height).exists()
1468}
1469
1470impl TransactionPool for TxPoolAdapter {
1471 fn new_txs_watcher(&self) -> watch::Receiver<()> {
1472 self.service.get_new_executable_txs_notifier()
1473 }
1474}
1475
1476#[async_trait::async_trait]
1477impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
1478 async fn produce_and_execute_block(
1479 &self,
1480 height: BlockHeight,
1481 block_time: Tai64,
1482 source: TransactionsSource,
1483 deadline: Instant,
1484 ) -> anyhow::Result<UncommittedResult<Changes>> {
1485 match source {
1486 TransactionsSource::TxPool => {
1487 self.block_producer
1488 .produce_and_execute_block_txpool(height, block_time, deadline)
1489 .await
1490 }
1491 TransactionsSource::SpecificTransactions(txs) => {
1492 self.block_producer
1493 .produce_and_execute_block_transactions(height, block_time, txs)
1494 .await
1495 }
1496 }
1497 }
1498
1499 async fn produce_predefined_block(
1500 &self,
1501 block: &Block,
1502 ) -> anyhow::Result<UncommittedResult<Changes>> {
1503 self.block_producer
1504 .produce_and_execute_predefined(block, ())
1505 .await
1506 }
1507}
1508
1509#[async_trait::async_trait]
1510impl BlockImporter for BlockImporterAdapter {
1511 async fn commit_result(
1512 &self,
1513 result: UncommittedImporterResult<Changes>,
1514 ) -> anyhow::Result<()> {
1515 self.block_importer
1516 .commit_result(result)
1517 .await
1518 .map_err(Into::into)
1519 }
1520
1521 async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> {
1522 self.block_importer
1523 .execute_and_commit(block)
1524 .await
1525 .map_err(Into::into)
1526 }
1527
1528 fn block_stream(&self) -> BoxStream<BlockImportInfo> {
1529 Box::pin(
1530 BroadcastStream::new(self.block_importer.subscribe())
1531 .filter_map(|result| result.ok())
1532 .map(|result| BlockImportInfo::from(result.shared_result)),
1533 )
1534 }
1535
1536 fn latest_block_height(&self) -> anyhow::Result<Option<BlockHeight>> {
1537 self.database.latest_block_height().map_err(Into::into)
1538 }
1539}
1540
1541#[cfg(all(test, feature = "leader_lock", not(feature = "not_leader_lock")))]
1542#[allow(non_snake_case)]
1543mod tests {
1544 use super::*;
1545 use fuel_core_importer::ports::BlockReconciliationWritePort;
1546 use fuel_core_poa::ports::BlockReconciliationReadPort;
1547 use fuel_core_types::blockchain::consensus::Consensus;
1548 use std::{
1549 io::Read as _,
1550 net::{
1551 SocketAddrV4,
1552 TcpListener,
1553 TcpStream,
1554 },
1555 process::{
1556 Child,
1557 Command,
1558 Stdio,
1559 },
1560 sync::mpsc,
1561 thread,
1562 time::{
1563 Duration,
1564 Instant,
1565 },
1566 };
1567
1568 #[tokio::test(flavor = "multi_thread")]
1569 async fn leader_state__when_same_height_has_multiple_stream_entries_then_returns_highest_epoch_block()
1570 {
1571 let redis = RedisTestServer::spawn();
1573 let lease_key = "poa:test:stream-conflict".to_string();
1574 let stream_key = format!("{lease_key}:block:stream");
1575 let adapter = RedisLeaderLeaseAdapter::new(
1576 vec![redis.redis_url()],
1577 lease_key,
1578 Duration::from_secs(2),
1579 Duration::from_millis(100),
1580 Duration::from_millis(50),
1581 Duration::from_millis(0),
1582 1,
1583 1000,
1584 )
1585 .expect("adapter should be created");
1586
1587 let low_epoch_block = poa_block_at_time(1, 10);
1588 let high_epoch_block = poa_block_at_time(1, 20);
1589
1590 let low_epoch_data =
1591 postcard::to_allocvec(&low_epoch_block).expect("serialize block");
1592 let high_epoch_data =
1593 postcard::to_allocvec(&high_epoch_block).expect("serialize block");
1594
1595 let redis_client =
1596 redis::Client::open(redis.redis_url()).expect("redis client should open");
1597 let mut conn = redis_client
1598 .get_connection()
1599 .expect("redis connection should open");
1600 append_stream_block(&mut conn, &stream_key, 1, &low_epoch_data, 1);
1601 append_stream_block(&mut conn, &stream_key, 1, &high_epoch_data, 2);
1602
1603 let leader_state = adapter
1605 .leader_state(1.into())
1606 .await
1607 .expect("leader_state should succeed");
1608
1609 let unreconciled_blocks = match leader_state {
1611 LeaderState::UnreconciledBlocks(blocks) => blocks,
1612 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1613 };
1614 assert_eq!(unreconciled_blocks.len(), 1);
1615 assert_eq!(
1616 unreconciled_blocks[0].entity.header().time(),
1617 high_epoch_block.entity.header().time(),
1618 "Expected reconciliation to pick the highest epoch block for the same height",
1619 );
1620 }
1621
1622 #[tokio::test(flavor = "multi_thread")]
1623 async fn leader_state__when_same_height_same_epoch_has_multiple_stream_entries_then_keeps_latest_entry()
1624 {
1625 let redis = RedisTestServer::spawn();
1627 let lease_key = "poa:test:equal-epoch-latest-entry".to_string();
1628 let stream_key = format!("{lease_key}:block:stream");
1629 let adapter = RedisLeaderLeaseAdapter::new(
1630 vec![redis.redis_url()],
1631 lease_key,
1632 Duration::from_secs(2),
1633 Duration::from_millis(100),
1634 Duration::from_millis(50),
1635 Duration::from_millis(0),
1636 1,
1637 1000,
1638 )
1639 .expect("adapter should be created");
1640
1641 let stale_block = poa_block_at_time(1, 10);
1642 let retry_block = poa_block_at_time(1, 20);
1643 let stale_data =
1644 postcard::to_allocvec(&stale_block).expect("stale block should serialize");
1645 let retry_data =
1646 postcard::to_allocvec(&retry_block).expect("retry block should serialize");
1647
1648 let redis_client =
1649 redis::Client::open(redis.redis_url()).expect("redis client should open");
1650 let mut conn = redis_client
1651 .get_connection()
1652 .expect("redis connection should open");
1653 append_stream_block(&mut conn, &stream_key, 1, &stale_data, 1);
1654 append_stream_block(&mut conn, &stream_key, 1, &retry_data, 1);
1655
1656 let leader_state = adapter
1658 .leader_state(1.into())
1659 .await
1660 .expect("leader_state should succeed");
1661
1662 let unreconciled_blocks = match leader_state {
1664 LeaderState::UnreconciledBlocks(blocks) => blocks,
1665 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1666 };
1667 assert_eq!(unreconciled_blocks.len(), 1);
1668 assert_eq!(
1669 unreconciled_blocks[0].entity.id(),
1670 retry_block.entity.id(),
1671 "Expected reconciliation to keep latest stream entry for equal epoch",
1672 );
1673 }
1674
1675 #[tokio::test(flavor = "multi_thread")]
1676 async fn leader_state__when_height_has_disagreeing_block_ids_then_repairs_with_highest_epoch_block()
1677 {
1678 let redis_a = RedisTestServer::spawn();
1680 let redis_b = RedisTestServer::spawn();
1681 let redis_c = RedisTestServer::spawn();
1682 let lease_key = "poa:test:epoch-quorum-block-mismatch".to_string();
1683 let stream_key = format!("{lease_key}:block:stream");
1684 let adapter = new_test_adapter(
1685 vec![
1686 redis_a.redis_url(),
1687 redis_b.redis_url(),
1688 redis_c.redis_url(),
1689 ],
1690 lease_key,
1691 );
1692 assert!(
1693 adapter
1694 .acquire_lease_if_free()
1695 .await
1696 .expect("acquire should succeed"),
1697 "adapter should acquire lease"
1698 );
1699
1700 let block_a = poa_block_at_time(1, 10);
1701 let block_b = poa_block_at_time(1, 20);
1702 let block_a_data =
1703 postcard::to_allocvec(&block_a).expect("block a should serialize");
1704 let block_b_data =
1705 postcard::to_allocvec(&block_b).expect("block b should serialize");
1706
1707 let redis_a_client =
1708 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1709 let redis_b_client =
1710 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1711 let mut conn_a = redis_a_client
1712 .get_connection()
1713 .expect("redis a connection should open");
1714 let mut conn_b = redis_b_client
1715 .get_connection()
1716 .expect("redis b connection should open");
1717
1718 append_stream_block(&mut conn_a, &stream_key, 1, &block_a_data, 7);
1720 append_stream_block(&mut conn_b, &stream_key, 1, &block_b_data, 7);
1721
1722 let leader_state = adapter
1725 .leader_state(1.into())
1726 .await
1727 .expect("leader_state should succeed");
1728
1729 assert!(
1731 matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1732 "Expected repair to pick one block and reach quorum, got {leader_state:?}",
1733 );
1734 }
1735
1736 #[tokio::test(flavor = "multi_thread")]
1748 async fn leader_state__when_same_block_has_different_epochs_across_nodes_then_reconciles_without_repair()
1749 {
1750 let redis_a = RedisTestServer::spawn();
1753 let redis_b = RedisTestServer::spawn();
1754 let redis_c = RedisTestServer::spawn();
1755 let lease_key = "poa:test:same-block-different-epochs".to_string();
1756 let stream_key = format!("{lease_key}:block:stream");
1757 let adapter = new_test_adapter(
1758 vec![
1759 redis_a.redis_url(),
1760 redis_b.redis_url(),
1761 redis_c.redis_url(),
1762 ],
1763 lease_key,
1764 );
1765 assert!(
1766 adapter
1767 .acquire_lease_if_free()
1768 .await
1769 .expect("acquire should succeed"),
1770 "adapter should acquire lease"
1771 );
1772
1773 let block = poa_block_at_time(1, 10);
1778 let block_data = postcard::to_allocvec(&block).expect("should serialize");
1779
1780 let redis_a_client =
1781 redis::Client::open(redis_a.redis_url()).expect("redis a client");
1782 let redis_b_client =
1783 redis::Client::open(redis_b.redis_url()).expect("redis b client");
1784 let redis_c_client =
1785 redis::Client::open(redis_c.redis_url()).expect("redis c client");
1786 let mut conn_a = redis_a_client.get_connection().expect("redis a conn");
1787 let mut conn_b = redis_b_client.get_connection().expect("redis b conn");
1788 let mut conn_c = redis_c_client.get_connection().expect("redis c conn");
1789
1790 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 5);
1792 append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 7);
1793 append_stream_block(&mut conn_c, &stream_key, 1, &block_data, 9);
1794
1795 let leader_state = adapter
1797 .leader_state(1.into())
1798 .await
1799 .expect("leader_state should succeed");
1800
1801 assert!(
1806 matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1807 "Expected block to be reconciled from quorum across mixed epochs, got {leader_state:?}"
1808 );
1809 }
1810
1811 #[tokio::test(flavor = "multi_thread")]
1812 async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it()
1813 {
1814 let redis_a = RedisTestServer::spawn();
1816 let redis_b = RedisTestServer::spawn();
1817 let redis_c = RedisTestServer::spawn();
1818 let lease_key = "poa:test:below-quorum".to_string();
1819 let stream_key = format!("{lease_key}:block:stream");
1820 let adapter = new_test_adapter(
1821 vec![
1822 redis_a.redis_url(),
1823 redis_b.redis_url(),
1824 redis_c.redis_url(),
1825 ],
1826 lease_key,
1827 );
1828 assert!(
1829 adapter
1830 .acquire_lease_if_free()
1831 .await
1832 .expect("acquire should succeed"),
1833 "adapter should acquire lease"
1834 );
1835
1836 let orphan_block = poa_block_at_time(1, 10);
1837 let orphan_block_data =
1838 postcard::to_allocvec(&orphan_block).expect("orphan block should serialize");
1839
1840 let redis_client =
1841 redis::Client::open(redis_a.redis_url()).expect("redis client should open");
1842 let mut conn = redis_client
1843 .get_connection()
1844 .expect("redis connection should open");
1845 append_stream_block(&mut conn, &stream_key, 1, &orphan_block_data, 1);
1846
1847 let leader_state = adapter
1849 .leader_state(1.into())
1850 .await
1851 .expect("leader_state should succeed");
1852
1853 assert!(
1855 matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1856 "Expected sub-quorum entry to be repaired and returned, got {leader_state:?}"
1857 );
1858 }
1859
1860 #[tokio::test(flavor = "multi_thread")]
1861 async fn leader_state__when_contiguous_heights_have_quorum_then_repairs_sub_quorum_tail()
1862 {
1863 let redis_a = RedisTestServer::spawn();
1865 let redis_b = RedisTestServer::spawn();
1866 let redis_c = RedisTestServer::spawn();
1867 let lease_key = "poa:test:contiguous-quorum".to_string();
1868 let stream_key = format!("{lease_key}:block:stream");
1869 let adapter = new_test_adapter(
1870 vec![
1871 redis_a.redis_url(),
1872 redis_b.redis_url(),
1873 redis_c.redis_url(),
1874 ],
1875 lease_key,
1876 );
1877 assert!(
1878 adapter
1879 .acquire_lease_if_free()
1880 .await
1881 .expect("acquire should succeed"),
1882 "adapter should acquire lease"
1883 );
1884
1885 let h1 = poa_block_at_time(1, 10);
1886 let h2 = poa_block_at_time(2, 20);
1887 let h3 = poa_block_at_time(3, 30);
1888 let h1_data = postcard::to_allocvec(&h1).expect("h1 should serialize");
1889 let h2_data = postcard::to_allocvec(&h2).expect("h2 should serialize");
1890 let h3_data = postcard::to_allocvec(&h3).expect("h3 should serialize");
1891
1892 let redis_a_client =
1893 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1894 let redis_b_client =
1895 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1896 let mut conn_a = redis_a_client
1897 .get_connection()
1898 .expect("redis a connection should open");
1899 let mut conn_b = redis_b_client
1900 .get_connection()
1901 .expect("redis b connection should open");
1902
1903 append_stream_block(&mut conn_a, &stream_key, 1, &h1_data, 1);
1905 append_stream_block(&mut conn_b, &stream_key, 1, &h1_data, 1);
1906 append_stream_block(&mut conn_a, &stream_key, 2, &h2_data, 1);
1908 append_stream_block(&mut conn_b, &stream_key, 2, &h2_data, 1);
1909 append_stream_block(&mut conn_a, &stream_key, 3, &h3_data, 1);
1911
1912 let leader_state = adapter
1914 .leader_state(1.into())
1915 .await
1916 .expect("leader_state should succeed");
1917
1918 let unreconciled_blocks = match leader_state {
1920 LeaderState::UnreconciledBlocks(blocks) => blocks,
1921 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1922 };
1923 assert_eq!(
1924 unreconciled_blocks
1925 .iter()
1926 .map(|b| u32::from(*b.entity.header().height()))
1927 .collect::<Vec<_>>(),
1928 vec![1, 2, 3],
1929 "Expected all heights including repaired sub-quorum h3",
1930 );
1931 }
1932
1933 #[tokio::test(flavor = "multi_thread")]
1934 async fn leader_state__when_contiguous_quorum_blocks_are_present_then_returns_all_available_contiguous_blocks()
1935 {
1936 let redis_a = RedisTestServer::spawn();
1938 let redis_b = RedisTestServer::spawn();
1939 let redis_c = RedisTestServer::spawn();
1940 let lease_key = "poa:test:contiguous-over-128".to_string();
1941 let stream_key = format!("{lease_key}:block:stream");
1942 let adapter = RedisLeaderLeaseAdapter::new(
1943 vec![
1944 redis_a.redis_url(),
1945 redis_b.redis_url(),
1946 redis_c.redis_url(),
1947 ],
1948 lease_key,
1949 Duration::from_secs(2),
1950 Duration::from_millis(100),
1951 Duration::from_millis(50),
1952 Duration::from_millis(0),
1953 1,
1954 1000,
1955 )
1956 .expect("adapter should be created");
1957 let redis_a_client =
1958 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1959 let redis_b_client =
1960 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1961 let redis_c_client =
1962 redis::Client::open(redis_c.redis_url()).expect("redis c client should open");
1963 let mut conn_a = redis_a_client
1964 .get_connection()
1965 .expect("redis a connection should open");
1966 let mut conn_b = redis_b_client
1967 .get_connection()
1968 .expect("redis b connection should open");
1969 let mut conn_c = redis_c_client
1970 .get_connection()
1971 .expect("redis c connection should open");
1972 let _ = &mut conn_c;
1973
1974 (1_u32..=129_u32).for_each(|height| {
1975 let block = poa_block_at_time(height, u64::from(height));
1976 let block_data =
1977 postcard::to_allocvec(&block).expect("block should serialize");
1978 append_stream_block(&mut conn_a, &stream_key, height, &block_data, 1);
1979 append_stream_block(&mut conn_b, &stream_key, height, &block_data, 1);
1980 });
1981
1982 let leader_state = adapter
1984 .leader_state(1.into())
1985 .await
1986 .expect("leader_state should succeed");
1987
1988 let unreconciled_blocks = match leader_state {
1990 LeaderState::UnreconciledBlocks(blocks) => blocks,
1991 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1992 };
1993 assert_eq!(
1994 unreconciled_blocks.len(),
1995 129,
1996 "Expected all contiguous quorum-backed heights to reconcile in one call",
1997 );
1998 }
1999
2000 #[tokio::test(flavor = "multi_thread")]
2001 async fn publish_produced_block__when_fencing_token_is_uninitialized_then_returns_error()
2002 {
2003 let redis_a = RedisTestServer::spawn();
2005 let redis_b = RedisTestServer::spawn();
2006 let redis_c = RedisTestServer::spawn();
2007 let lease_key = "poa:test:missing-epoch".to_string();
2008 let redis_urls = vec![
2009 redis_a.redis_url(),
2010 redis_b.redis_url(),
2011 redis_c.redis_url(),
2012 ];
2013 let adapter = new_test_adapter(redis_urls, lease_key);
2014 let block = poa_block_at_time(1, 100);
2015
2016 let publish_result = adapter.publish_produced_block(&block);
2018
2019 assert!(
2021 publish_result.is_err(),
2022 "Publish should fail when fencing token is not initialized"
2023 );
2024 }
2025
2026 #[tokio::test(flavor = "multi_thread")]
2027 async fn release__when_adapter_is_not_lease_owner_then_returns_ok() {
2028 let redis_a = RedisTestServer::spawn();
2030 let redis_b = RedisTestServer::spawn();
2031 let redis_c = RedisTestServer::spawn();
2032 let lease_key = "poa:test:release-follower".to_string();
2033 let redis_urls = vec![
2034 redis_a.redis_url(),
2035 redis_b.redis_url(),
2036 redis_c.redis_url(),
2037 ];
2038 let adapter = new_test_adapter(redis_urls, lease_key);
2039
2040 let release_result = adapter.release().await;
2042
2043 assert!(
2045 release_result.is_ok(),
2046 "Release should be idempotent for adapters that do not own quorum lease"
2047 );
2048 }
2049
2050 #[tokio::test(flavor = "multi_thread")]
2051 async fn drop__when_non_last_clone_is_dropped_then_does_not_release_shared_lease() {
2052 let redis_a = RedisTestServer::spawn();
2054 let redis_b = RedisTestServer::spawn();
2055 let redis_c = RedisTestServer::spawn();
2056 let lease_key = "poa:test:drop-non-last-clone".to_string();
2057 let redis_urls = vec![
2058 redis_a.redis_url(),
2059 redis_b.redis_url(),
2060 redis_c.redis_url(),
2061 ];
2062 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
2063 assert!(
2064 adapter
2065 .acquire_lease_if_free()
2066 .await
2067 .expect("acquire should succeed"),
2068 "Adapter should acquire lease"
2069 );
2070 let adapter_clone = adapter.clone();
2071 let owner_token = adapter.lease_owner_token.clone();
2072
2073 drop(adapter_clone);
2075 sleep(Duration::from_millis(50)).await;
2076
2077 let owners = redis_urls
2079 .iter()
2080 .filter(|redis_url| {
2081 read_lease_owner(redis_url, &lease_key).as_deref()
2082 == Some(owner_token.as_str())
2083 })
2084 .count();
2085 assert!(
2086 owners >= 2,
2087 "Dropping a non-last clone must not release quorum lease ownership"
2088 );
2089 }
2090
2091 #[tokio::test(flavor = "multi_thread")]
2092 async fn leader_state__when_lease_is_free_then_acquires_quorum_ownership() {
2093 let redis_a = RedisTestServer::spawn();
2095 let redis_b = RedisTestServer::spawn();
2096 let redis_c = RedisTestServer::spawn();
2097 let lease_key = "poa:test:acquire-on-leader-state".to_string();
2098 let redis_urls = vec![
2099 redis_a.redis_url(),
2100 redis_b.redis_url(),
2101 redis_c.redis_url(),
2102 ];
2103 let adapter = RedisLeaderLeaseAdapter::new(
2104 redis_urls.clone(),
2105 lease_key.clone(),
2106 Duration::from_millis(500),
2107 Duration::from_millis(100),
2108 Duration::from_millis(50),
2109 Duration::from_millis(0),
2110 1,
2111 1000,
2112 )
2113 .expect("adapter should be created");
2114
2115 let state = adapter
2117 .leader_state(1.into())
2118 .await
2119 .expect("leader_state should succeed");
2120 let owners = redis_urls
2121 .iter()
2122 .filter(|redis_url| {
2123 read_lease_owner(redis_url, &lease_key).as_deref()
2124 == Some(adapter.lease_owner_token.as_str())
2125 })
2126 .count();
2127
2128 assert!(
2130 matches!(state, LeaderState::ReconciledLeader),
2131 "leader_state should acquire and report leader ownership when lease is free"
2132 );
2133 assert!(
2134 owners >= 2,
2135 "Lease ownership should be present on quorum after acquisition"
2136 );
2137 }
2138
2139 #[tokio::test(flavor = "multi_thread")]
2140 async fn leader_state__when_lease_expires_then_another_adapter_becomes_leader() {
2141 let redis_a = RedisTestServer::spawn();
2143 let redis_b = RedisTestServer::spawn();
2144 let redis_c = RedisTestServer::spawn();
2145 let lease_key = "poa:test:ttl-expiry-handoff".to_string();
2146 let redis_urls = vec![
2147 redis_a.redis_url(),
2148 redis_b.redis_url(),
2149 redis_c.redis_url(),
2150 ];
2151 let first_adapter = RedisLeaderLeaseAdapter::new(
2152 redis_urls.clone(),
2153 lease_key.clone(),
2154 Duration::from_millis(300),
2155 Duration::from_millis(100),
2156 Duration::from_millis(50),
2157 Duration::from_millis(0),
2158 1,
2159 1000,
2160 )
2161 .expect("first adapter should be created");
2162 let second_adapter = RedisLeaderLeaseAdapter::new(
2163 redis_urls.clone(),
2164 lease_key.clone(),
2165 Duration::from_millis(300),
2166 Duration::from_millis(100),
2167 Duration::from_millis(50),
2168 Duration::from_millis(0),
2169 1,
2170 1000,
2171 )
2172 .expect("second adapter should be created");
2173
2174 let first_state = first_adapter
2175 .leader_state(1.into())
2176 .await
2177 .expect("first leader_state should succeed");
2178 sleep(Duration::from_millis(900)).await;
2179
2180 let second_state = second_adapter
2182 .leader_state(1.into())
2183 .await
2184 .expect("second leader_state should succeed");
2185 let second_owner_count = redis_urls
2186 .iter()
2187 .filter(|redis_url| {
2188 read_lease_owner(redis_url, &lease_key).as_deref()
2189 == Some(second_adapter.lease_owner_token.as_str())
2190 })
2191 .count();
2192
2193 assert!(
2195 matches!(first_state, LeaderState::ReconciledLeader),
2196 "First adapter should acquire lease initially"
2197 );
2198 assert!(
2199 matches!(second_state, LeaderState::ReconciledLeader),
2200 "Second adapter should become leader after TTL expiry"
2201 );
2202 assert!(
2203 second_owner_count >= 2,
2204 "Second adapter should own lease on quorum nodes after takeover"
2205 );
2206 }
2207
2208 #[tokio::test(flavor = "multi_thread")]
2209 async fn publish_produced_block__when_previous_leader_writes_after_handoff_then_rejects_zombie_write()
2210 {
2211 let redis_a = RedisTestServer::spawn();
2213 let redis_b = RedisTestServer::spawn();
2214 let redis_c = RedisTestServer::spawn();
2215 let lease_key = "poa:test:zombie-leader".to_string();
2216 let redis_urls = vec![
2217 redis_a.redis_url(),
2218 redis_b.redis_url(),
2219 redis_c.redis_url(),
2220 ];
2221 let old_leader = new_test_adapter(redis_urls.clone(), lease_key.clone());
2222 let current_leader = new_test_adapter(redis_urls, lease_key.clone());
2223 let block = poa_block_at_time(1, 111);
2224
2225 assert!(
2226 old_leader
2227 .acquire_lease_if_free()
2228 .await
2229 .expect("acquire should succeed"),
2230 "Old leader should acquire initial lease"
2231 );
2232 clear_lease_on_nodes(
2233 &[
2234 redis_a.redis_url(),
2235 redis_b.redis_url(),
2236 redis_c.redis_url(),
2237 ],
2238 &lease_key,
2239 );
2240 assert!(
2241 current_leader
2242 .acquire_lease_if_free()
2243 .await
2244 .expect("acquire should succeed"),
2245 "Current leader should acquire lease after handoff"
2246 );
2247
2248 let zombie_write = old_leader.publish_produced_block(&block);
2250
2251 assert!(
2253 zombie_write.is_err(),
2254 "Old leader write should be fenced after handoff"
2255 );
2256 let current_state = current_leader
2257 .leader_state(1.into())
2258 .await
2259 .expect("leader_state should succeed");
2260 assert!(
2261 matches!(current_state, LeaderState::ReconciledLeader),
2262 "Zombie partial writes must not be considered committed"
2263 );
2264 }
2265
2266 #[tokio::test(flavor = "multi_thread")]
2267 async fn publish_produced_block__when_epoch_is_behind_on_one_node_then_first_write_heals_epoch()
2268 {
2269 let redis_a = RedisTestServer::spawn();
2271 let redis_b = RedisTestServer::spawn();
2272 let redis_c = RedisTestServer::spawn();
2273 let lease_key = "poa:test:epoch-healing".to_string();
2274 let redis_urls = vec![
2275 redis_a.redis_url(),
2276 redis_b.redis_url(),
2277 redis_c.redis_url(),
2278 ];
2279 let adapter = new_test_adapter(redis_urls, lease_key.clone());
2280 let epoch_key = format!("{lease_key}:epoch:token");
2281 let block = poa_block_at_time(1, 222);
2282 assert!(
2283 adapter
2284 .acquire_lease_if_free()
2285 .await
2286 .expect("acquire should succeed"),
2287 "Adapter should acquire lease"
2288 );
2289 let leader_epoch = (*adapter.current_epoch_token.lock().expect("poisoned lock"))
2290 .expect("epoch should be initialized");
2291 let stale_epoch = leader_epoch.saturating_sub(1);
2292 set_epoch(&redis_a.redis_url(), &epoch_key, stale_epoch);
2293
2294 let publish_result = adapter.publish_produced_block(&block);
2296
2297 assert!(
2299 publish_result.is_ok(),
2300 "Publish should still succeed on quorum"
2301 );
2302 let healed_epoch = read_epoch(&redis_a.redis_url(), &epoch_key);
2303 assert_eq!(
2304 healed_epoch, leader_epoch,
2305 "First successful write should heal lagging epoch"
2306 );
2307 }
2308
2309 #[tokio::test(flavor = "multi_thread")]
2310 async fn publish_produced_block__when_write_succeeds_then_extends_lease_ttl() {
2311 let redis_a = RedisTestServer::spawn();
2313 let redis_b = RedisTestServer::spawn();
2314 let redis_c = RedisTestServer::spawn();
2315 let lease_key = "poa:test:publish-extends-lease-ttl".to_string();
2316 let redis_urls = vec![
2317 redis_a.redis_url(),
2318 redis_b.redis_url(),
2319 redis_c.redis_url(),
2320 ];
2321 let adapter = RedisLeaderLeaseAdapter::new(
2322 redis_urls.clone(),
2323 lease_key.clone(),
2324 Duration::from_millis(700),
2325 Duration::from_millis(100),
2326 Duration::from_millis(50),
2327 Duration::from_millis(0),
2328 1,
2329 1000,
2330 )
2331 .expect("adapter should be created");
2332 let block = poa_block_at_time(1, 444);
2333 assert!(
2334 adapter
2335 .acquire_lease_if_free()
2336 .await
2337 .expect("acquire should succeed"),
2338 "Adapter should acquire lease"
2339 );
2340 sleep(Duration::from_millis(500)).await;
2341
2342 let publish_result = adapter.publish_produced_block(&block);
2344 sleep(Duration::from_millis(400)).await;
2345 let owners = redis_urls
2346 .iter()
2347 .filter(|redis_url| {
2348 read_lease_owner(redis_url, &lease_key).as_deref()
2349 == Some(adapter.lease_owner_token.as_str())
2350 })
2351 .count();
2352
2353 assert!(publish_result.is_ok(), "Publish should succeed on quorum");
2355 assert!(
2356 owners >= 2,
2357 "Successful write should extend lease TTL on quorum beyond original window"
2358 );
2359 }
2360
2361 #[tokio::test(flavor = "multi_thread")]
2362 async fn publish_produced_block__when_write_succeeds_on_less_than_quorum_then_entry_is_not_reconciled()
2363 {
2364 let redis_a = RedisTestServer::spawn();
2366 let redis_b = RedisTestServer::spawn();
2367 let redis_c = RedisTestServer::spawn();
2368 let lease_key = "poa:test:partial-write".to_string();
2369 let stream_key = format!("{lease_key}:block:stream");
2370 let redis_urls = vec![
2371 redis_a.redis_url(),
2372 redis_b.redis_url(),
2373 redis_c.redis_url(),
2374 ];
2375 let adapter = new_test_adapter(redis_urls, lease_key.clone());
2376 let block = poa_block_at_time(1, 333);
2377 assert!(
2378 adapter
2379 .acquire_lease_if_free()
2380 .await
2381 .expect("acquire should succeed"),
2382 "Adapter should acquire lease"
2383 );
2384 set_lease_owner(
2385 &redis_b.redis_url(),
2386 &lease_key,
2387 "other-owner",
2388 adapter.lease_ttl_millis,
2389 );
2390 set_lease_owner(
2391 &redis_c.redis_url(),
2392 &lease_key,
2393 "other-owner",
2394 adapter.lease_ttl_millis,
2395 );
2396
2397 let publish_result = adapter.publish_produced_block(&block);
2399 let unreconciled = adapter.unreconciled_blocks(1.into()).await;
2400
2401 assert!(
2403 publish_result.is_err(),
2404 "Publish must fail when fewer than quorum nodes accept write"
2405 );
2406 assert!(
2407 unreconciled.is_err(),
2408 "Unresolved backlog should return an error instead of empty result"
2409 );
2410 assert_eq!(
2411 stream_len(&redis_a.redis_url(), &stream_key),
2412 1,
2413 "One orphan entry should exist on the single successful node"
2414 );
2415 }
2416
2417 #[tokio::test(flavor = "multi_thread")]
2418 async fn unreconciled_blocks__when_quorum_latest_height_is_below_next_height_then_returns_empty()
2419 {
2420 let redis = RedisTestServer::spawn();
2422 let lease_key = "poa:test:cursor-fast-path".to_string();
2423 let stream_key = format!("{lease_key}:block:stream");
2424 let adapter = RedisLeaderLeaseAdapter::new(
2425 vec![redis.redis_url()],
2426 lease_key,
2427 Duration::from_secs(2),
2428 Duration::from_millis(100),
2429 Duration::from_millis(50),
2430 Duration::from_millis(0),
2431 1,
2432 1000,
2433 )
2434 .expect("adapter should be created");
2435 let block = poa_block_at_time(1, 10);
2436 let block_data = postcard::to_allocvec(&block).expect("serialize block");
2437 let redis_client =
2438 redis::Client::open(redis.redis_url()).expect("redis client should open");
2439 let mut conn = redis_client
2440 .get_connection()
2441 .expect("redis connection should open");
2442 append_stream_block(&mut conn, &stream_key, 1, &block_data, 1);
2443
2444 let blocks = adapter
2446 .unreconciled_blocks(2.into())
2447 .await
2448 .expect("reconciliation read should succeed");
2449
2450 assert!(
2452 blocks.is_empty(),
2453 "Expected fast path to skip full reconciliation"
2454 );
2455 }
2456
2457 #[tokio::test(flavor = "multi_thread")]
2458 async fn read_stream_entries_on_node__when_next_height_is_provided_then_reads_matching_entries()
2459 {
2460 let redis = RedisTestServer::spawn();
2462 let lease_key = "poa:test:cursor-incremental-read".to_string();
2463 let stream_key = format!("{lease_key}:block:stream");
2464 let adapter = RedisLeaderLeaseAdapter::new(
2465 vec![redis.redis_url()],
2466 lease_key,
2467 Duration::from_secs(2),
2468 Duration::from_millis(100),
2469 Duration::from_millis(50),
2470 Duration::from_millis(0),
2471 1,
2472 1000,
2473 )
2474 .expect("adapter should be created");
2475 let redis_client =
2476 redis::Client::open(redis.redis_url()).expect("redis client should open");
2477 let mut conn = redis_client
2478 .get_connection()
2479 .expect("redis connection should open");
2480 let h1 = poa_block_at_time(1, 10);
2481 let h2 = poa_block_at_time(2, 20);
2482 let h3 = poa_block_at_time(3, 30);
2483 let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2484 let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2485 let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2486 append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2487 append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2488 let redis_node = adapter.redis_nodes[0].clone();
2489
2490 let first_read = adapter
2492 .read_stream_entries_on_node(&redis_node, 1, 1000)
2493 .await
2494 .expect("first read should succeed");
2495 append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2496 let second_read = adapter
2497 .read_stream_entries_on_node(&redis_node, 3, 1000)
2498 .await
2499 .expect("second read should succeed");
2500
2501 assert_eq!(
2503 first_read.len(),
2504 2,
2505 "Expected initial read to include existing entries"
2506 );
2507 assert_eq!(
2508 second_read.len(),
2509 1,
2510 "Expected height-filtered read to include only matching entries"
2511 );
2512 assert_eq!(
2513 u32::from(*second_read[0].2.entity.header().height()),
2514 3,
2515 "Expected only the requested next height"
2516 );
2517 }
2518
2519 #[tokio::test(flavor = "multi_thread")]
2520 async fn read_stream_entries_on_node__when_max_entries_is_small_then_caps_results() {
2521 let redis = RedisTestServer::spawn();
2523 let lease_key = "poa:test:cursor-pagination".to_string();
2524 let stream_key = format!("{lease_key}:block:stream");
2525 let adapter = RedisLeaderLeaseAdapter::new(
2526 vec![redis.redis_url()],
2527 lease_key,
2528 Duration::from_secs(2),
2529 Duration::from_millis(100),
2530 Duration::from_millis(50),
2531 Duration::from_millis(0),
2532 1,
2533 1000,
2534 )
2535 .expect("adapter should be created");
2536 let redis_client =
2537 redis::Client::open(redis.redis_url()).expect("redis client should open");
2538 let mut conn = redis_client
2539 .get_connection()
2540 .expect("redis connection should open");
2541 let h1 = poa_block_at_time(1, 10);
2542 let h2 = poa_block_at_time(2, 20);
2543 let h3 = poa_block_at_time(3, 30);
2544 let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2545 let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2546 let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2547 append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2548 append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2549 append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2550 let redis_node = adapter.redis_nodes[0].clone();
2551
2552 let first_page = adapter
2554 .read_stream_entries_on_node(&redis_node, 1, 2)
2555 .await
2556 .expect("first page should succeed");
2557 let second_page = adapter
2558 .read_stream_entries_on_node(&redis_node, 3, 2)
2559 .await
2560 .expect("second page should succeed");
2561
2562 assert_eq!(first_page.len(), 2, "Expected first page to be capped");
2564 assert_eq!(
2565 u32::from(*first_page[0].2.entity.header().height()),
2566 1,
2567 "Expected first page to start from earliest height"
2568 );
2569 assert_eq!(
2570 u32::from(*first_page[1].2.entity.header().height()),
2571 2,
2572 "Expected first page to include second height"
2573 );
2574 assert_eq!(
2575 second_page.len(),
2576 1,
2577 "Expected height filter to return only matching trailing entry"
2578 );
2579 assert_eq!(
2580 u32::from(*second_page[0].2.entity.header().height()),
2581 3,
2582 "Expected second read to include only the requested next height"
2583 );
2584 }
2585
2586 #[tokio::test(flavor = "multi_thread")]
2593 async fn partial_publish_then_retry_at_same_height__new_leader_reconciles_stale_block()
2594 {
2595 let redis = RedisTestServer::spawn();
2596 let lease_key = "poa:test:fork-repro".to_string();
2597 let stream_key = format!("{lease_key}:block:stream");
2598
2599 let adapter_a = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2600 assert!(
2601 adapter_a
2602 .acquire_lease_if_free()
2603 .await
2604 .expect("acquire should succeed"),
2605 "adapter_a should acquire lease"
2606 );
2607 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2608 .expect("epoch should be set");
2609
2610 let block_a = poa_block_at_time(1, 100);
2612 let block_a_data = postcard::to_allocvec(&block_a).expect("serialize block_a");
2613 let redis_client =
2614 redis::Client::open(redis.redis_url()).expect("redis client should open");
2615 let mut conn = redis_client
2616 .get_connection()
2617 .expect("redis connection should open");
2618 append_stream_block(&mut conn, &stream_key, 1, &block_a_data, epoch_a as u32);
2619
2620 let block_b = poa_block_at_time(1, 999);
2623 assert_ne!(
2624 block_a.entity.header().time(),
2625 block_b.entity.header().time()
2626 );
2627
2628 let result = adapter_a.publish_produced_block(&block_b);
2629 assert!(
2630 result.is_err(),
2631 "publish at same height should fail due to HEIGHT_EXISTS"
2632 );
2633
2634 assert_eq!(stream_len(&redis.redis_url(), &stream_key), 1);
2636
2637 adapter_a.release().await.expect("release should succeed");
2638
2639 let adapter_b = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2641 assert!(
2642 adapter_b
2643 .acquire_lease_if_free()
2644 .await
2645 .expect("acquire should succeed"),
2646 "adapter_b should acquire lease"
2647 );
2648
2649 let unreconciled = adapter_b
2650 .unreconciled_blocks(1.into())
2651 .await
2652 .expect("reconciliation should succeed");
2653
2654 assert_eq!(unreconciled.len(), 1, "Should reconcile exactly one block");
2655 assert_eq!(
2656 unreconciled[0].entity.header().time(),
2657 block_a.entity.header().time(),
2658 "Reconciliation should return block_a (the only entry in the stream)"
2659 );
2660 }
2661
2662 fn spawn_halflive_redis_listener() -> u16 {
2699 let listener =
2700 TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2701 .expect("bind ephemeral port");
2702 let port = listener.local_addr().unwrap().port();
2703 thread::spawn(move || {
2704 for incoming in listener.incoming() {
2705 let Ok(mut stream) = incoming else {
2706 continue;
2707 };
2708 thread::spawn(move || {
2709 let mut buf = [0u8; 4096];
2710 while let Ok(n) = stream.read(&mut buf) {
2711 if n == 0 {
2712 return;
2713 }
2714 }
2715 });
2716 }
2717 });
2718 port
2719 }
2720
2721 #[test]
2727 fn redis_get_connection_with_timeout__is_bounded_against_halflive_peer() {
2728 const NODE_TIMEOUT: Duration = Duration::from_secs(1);
2729 const MAX_WAIT: Duration = Duration::from_secs(5);
2730
2731 let port = spawn_halflive_redis_listener();
2732 let url = format!("redis://127.0.0.1:{port}/");
2733 let client = redis::Client::open(url).expect("client open");
2734
2735 let (tx, rx) = mpsc::channel();
2736 thread::spawn(move || {
2737 let start = Instant::now();
2738 let result = client.get_connection_with_timeout(NODE_TIMEOUT);
2739 let _ = tx.send((start.elapsed(), result.is_ok()));
2740 });
2741
2742 match rx.recv_timeout(MAX_WAIT) {
2743 Ok((elapsed, ok)) => {
2744 assert!(
2745 !ok,
2746 "expected err against a half-alive peer, got Ok after \
2747 {elapsed:?}",
2748 );
2749 assert!(
2750 elapsed <= MAX_WAIT,
2751 "call took {elapsed:?}, expected to be bounded under \
2752 {MAX_WAIT:?}",
2753 );
2754 }
2755 Err(_) => {
2756 panic!(
2757 "REGRESSION: get_connection_with_timeout did not \
2758 return within {MAX_WAIT:?} against a half-alive \
2759 peer. The redis-crate upstream bug from 0.27.x has \
2760 reappeared.",
2761 );
2762 }
2763 }
2764 }
2765
2766 #[tokio::test(flavor = "multi_thread")]
2777 async fn publish_produced_block__returns_within_bound_when_one_node_is_half_alive() {
2778 let redis_a = RedisTestServer::spawn();
2780 let redis_b = RedisTestServer::spawn();
2781 let redis_c = RedisTestServer::spawn();
2782 let halflive_port = spawn_halflive_redis_listener();
2783 let lease_key = "poa:test:halflive-publish".to_string();
2784 let urls = vec![
2785 redis_a.redis_url(),
2786 redis_b.redis_url(),
2787 redis_c.redis_url(),
2788 format!("redis://127.0.0.1:{halflive_port}/"),
2789 ];
2790 let adapter = new_test_adapter(urls, lease_key);
2791 assert!(
2792 adapter
2793 .acquire_lease_if_free()
2794 .await
2795 .expect("acquire should succeed"),
2796 "should acquire quorum from 3 healthy nodes out of 4",
2797 );
2798
2799 let block = poa_block_at_time(1, 111);
2803 let publish_adapter = adapter.clone();
2804 let start = Instant::now();
2805 let publish = tokio::task::spawn_blocking(move || {
2806 publish_adapter.publish_produced_block(&block)
2807 });
2808 let deadline = Duration::from_secs(5);
2809 let result = tokio::time::timeout(deadline, publish)
2810 .await
2811 .expect(
2812 "publish_produced_block must return within the deadline — \
2813 on the pre-fix code it hangs forever against a half-alive \
2814 peer",
2815 )
2816 .expect("spawn_blocking should not panic");
2817 let elapsed = start.elapsed();
2818
2819 result.expect("publish should succeed: 3/4 healthy nodes is quorum");
2821 assert!(
2822 elapsed < deadline,
2823 "publish took {elapsed:?}, expected well under {deadline:?}",
2824 );
2825 }
2826
2827 struct RedisTestServer {
2828 child: Option<Child>,
2829 port: u16,
2830 redis_url: String,
2831 }
2832
2833 impl RedisTestServer {
2834 fn spawn() -> Self {
2835 let mut server = Self::new_stopped();
2836 server.start();
2837 server
2838 }
2839
2840 fn new_stopped() -> Self {
2841 let port = bind_unused_port();
2842 Self {
2843 child: None,
2844 port,
2845 redis_url: format!("redis://127.0.0.1:{port}/"),
2846 }
2847 }
2848
2849 fn start(&mut self) {
2850 if self.child.is_some() {
2851 return;
2852 }
2853 let child = spawn_redis_server(self.port);
2854 wait_for_redis_ready(self.port);
2855 self.child = Some(child);
2856 }
2857
2858 fn stop(&mut self) {
2859 if let Some(child) = self.child.as_mut() {
2860 let _ = child.kill();
2861 let _ = child.wait();
2862 }
2863 self.child = None;
2864 }
2865
2866 fn redis_url(&self) -> String {
2867 self.redis_url.clone()
2868 }
2869 }
2870
2871 impl Drop for RedisTestServer {
2872 fn drop(&mut self) {
2873 if let Some(child) = self.child.as_mut() {
2874 let _ = child.kill();
2875 let _ = child.wait();
2876 }
2877 }
2878 }
2879
2880 fn bind_unused_port() -> u16 {
2881 let socket =
2882 TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2883 .expect("Should bind an ephemeral port");
2884 let port = socket.local_addr().expect("Should get local addr").port();
2885 drop(socket);
2886 port
2887 }
2888
2889 fn spawn_redis_server(port: u16) -> Child {
2890 Command::new("redis-server")
2891 .arg("--port")
2892 .arg(port.to_string())
2893 .arg("--save")
2894 .arg("")
2895 .arg("--appendonly")
2896 .arg("no")
2897 .arg("--bind")
2898 .arg("127.0.0.1")
2899 .stdout(Stdio::null())
2900 .stderr(Stdio::null())
2901 .spawn()
2902 .expect("Failed to spawn redis-server")
2903 }
2904
2905 fn wait_for_redis_ready(port: u16) {
2906 let addr = SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, port);
2907 let start = std::time::Instant::now();
2908 let timeout = Duration::from_secs(5);
2909 while start.elapsed() < timeout {
2910 if TcpStream::connect(addr).is_ok() {
2911 return;
2912 }
2913 thread::sleep(Duration::from_millis(10));
2914 }
2915 panic!("Redis server did not become ready on port {port}");
2916 }
2917
2918 fn poa_block_at_time(height: u32, timestamp: u64) -> SealedBlock {
2919 let mut block = Block::default();
2920 block.header_mut().set_block_height(height.into());
2921 block
2922 .header_mut()
2923 .set_time(fuel_core_types::tai64::Tai64(timestamp));
2924 block.header_mut().recalculate_metadata();
2925 SealedBlock {
2926 entity: block,
2927 consensus: Consensus::PoA(Default::default()),
2928 }
2929 }
2930
2931 fn append_stream_block(
2932 conn: &mut redis::Connection,
2933 stream_key: &str,
2934 height: u32,
2935 data: &[u8],
2936 epoch: u32,
2937 ) {
2938 let _: String = redis::cmd("XADD")
2939 .arg(stream_key)
2940 .arg("*")
2941 .arg("height")
2942 .arg(height)
2943 .arg("data")
2944 .arg(data)
2945 .arg("epoch")
2946 .arg(epoch)
2947 .arg("timestamp")
2948 .arg(epoch)
2949 .query(conn)
2950 .expect("stream write should succeed");
2951 }
2952
2953 fn new_test_adapter(
2954 redis_urls: Vec<String>,
2955 lease_key: String,
2956 ) -> RedisLeaderLeaseAdapter {
2957 RedisLeaderLeaseAdapter::new(
2958 redis_urls,
2959 lease_key,
2960 Duration::from_secs(2),
2961 Duration::from_millis(100),
2962 Duration::from_millis(50),
2963 Duration::from_millis(0),
2964 1,
2965 1000,
2966 )
2967 .expect("adapter should be created")
2968 }
2969
2970 fn set_epoch(redis_url: &str, epoch_key: &str, epoch: u64) {
2971 let redis_client =
2972 redis::Client::open(redis_url).expect("redis client should open");
2973 let mut conn = redis_client
2974 .get_connection()
2975 .expect("redis connection should open");
2976 let _: () = redis::cmd("SET")
2977 .arg(epoch_key)
2978 .arg(epoch)
2979 .query(&mut conn)
2980 .expect("epoch set should succeed");
2981 }
2982
2983 fn read_epoch(redis_url: &str, epoch_key: &str) -> u64 {
2984 let redis_client =
2985 redis::Client::open(redis_url).expect("redis client should open");
2986 let mut conn = redis_client
2987 .get_connection()
2988 .expect("redis connection should open");
2989 let epoch: Option<u64> = redis::cmd("GET")
2990 .arg(epoch_key)
2991 .query(&mut conn)
2992 .expect("epoch get should succeed");
2993 epoch.expect("epoch should exist")
2994 }
2995
2996 fn set_lease_owner(
2997 redis_url: &str,
2998 lease_key: &str,
2999 owner: &str,
3000 lease_ttl_millis: u64,
3001 ) {
3002 let redis_client =
3003 redis::Client::open(redis_url).expect("redis client should open");
3004 let mut conn = redis_client
3005 .get_connection()
3006 .expect("redis connection should open");
3007 let _: () = redis::cmd("SET")
3008 .arg(lease_key)
3009 .arg(owner)
3010 .arg("PX")
3011 .arg(lease_ttl_millis)
3012 .query(&mut conn)
3013 .expect("lease owner set should succeed");
3014 }
3015
3016 fn read_lease_owner(redis_url: &str, lease_key: &str) -> Option<String> {
3017 let redis_client =
3018 redis::Client::open(redis_url).expect("redis client should open");
3019 let mut conn = redis_client
3020 .get_connection()
3021 .expect("redis connection should open");
3022 redis::cmd("GET")
3023 .arg(lease_key)
3024 .query(&mut conn)
3025 .expect("lease owner get should succeed")
3026 }
3027
3028 fn clear_lease_on_nodes(redis_urls: &[String], lease_key: &str) {
3029 redis_urls.iter().for_each(|redis_url| {
3030 let redis_client = redis::Client::open(redis_url.as_str())
3031 .expect("redis client should open");
3032 let mut conn = redis_client
3033 .get_connection()
3034 .expect("redis connection should open");
3035 let _: () = redis::cmd("DEL")
3036 .arg(lease_key)
3037 .query(&mut conn)
3038 .expect("lease delete should succeed");
3039 });
3040 }
3041
3042 fn stream_len(redis_url: &str, stream_key: &str) -> usize {
3043 let redis_client =
3044 redis::Client::open(redis_url).expect("redis client should open");
3045 let mut conn = redis_client
3046 .get_connection()
3047 .expect("redis connection should open");
3048 redis::cmd("XLEN")
3049 .arg(stream_key)
3050 .query(&mut conn)
3051 .expect("stream length query should succeed")
3052 }
3053
3054 #[tokio::test(flavor = "multi_thread")]
3059 async fn unreconciled_blocks__when_reads_fail_on_quorum_nodes__returns_error() {
3060 let mut redis_a = RedisTestServer::spawn();
3062 let mut redis_b = RedisTestServer::spawn();
3063 let redis_c = RedisTestServer::spawn();
3064 let lease_key = "poa:test:read-failure-fork".to_string();
3065 let redis_urls = vec![
3066 redis_a.redis_url(),
3067 redis_b.redis_url(),
3068 redis_c.redis_url(),
3069 ];
3070
3071 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3072 assert!(
3073 adapter_a
3074 .acquire_lease_if_free()
3075 .await
3076 .expect("acquire should succeed"),
3077 "Leader A should acquire lease"
3078 );
3079
3080 let block = poa_block_at_time(1, 100);
3081 adapter_a
3082 .publish_produced_block(&block)
3083 .expect("publish should succeed on all 3 nodes");
3084
3085 let stream_key = format!("{lease_key}:block:stream");
3087 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3088 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3089 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
3090
3091 adapter_a.release().await.expect("release should succeed");
3093
3094 redis_a.stop();
3096 redis_b.stop();
3097
3098 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3099 {
3101 let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3102 *epoch = Some(99);
3103 }
3104
3105 let result = adapter_b.unreconciled_blocks(1.into()).await;
3106
3107 assert!(
3109 result.is_err(),
3110 "unreconciled_blocks must return error when reads fail on quorum of nodes"
3111 );
3112 }
3113
3114 #[tokio::test(flavor = "multi_thread")]
3118 async fn unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum()
3119 {
3120 let redis_a = RedisTestServer::spawn();
3123 let mut redis_b = RedisTestServer::spawn();
3124 let redis_c = RedisTestServer::spawn();
3125 let lease_key = "poa:test:data-loss-fork".to_string();
3126 let stream_key = format!("{lease_key}:block:stream");
3127 let redis_urls = vec![
3128 redis_a.redis_url(),
3129 redis_b.redis_url(),
3130 redis_c.redis_url(),
3131 ];
3132
3133 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3134 assert!(
3135 adapter_a
3136 .acquire_lease_if_free()
3137 .await
3138 .expect("acquire should succeed"),
3139 "Leader A should acquire lease"
3140 );
3141 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
3142 .expect("epoch should be set");
3143
3144 let block = poa_block_at_time(1, 100);
3148 let block_data = postcard::to_allocvec(&block).expect("serialize");
3149
3150 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3151 let mut conn_a = client_a.get_connection().expect("conn");
3152 let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3153 let mut conn_b = client_b.get_connection().expect("conn");
3154
3155 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, epoch_a as u32);
3156 append_stream_block(&mut conn_b, &stream_key, 1, &block_data, epoch_a as u32);
3157 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3161 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3162 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 0);
3163
3164 let pre_loss = adapter_a
3166 .unreconciled_blocks(1.into())
3167 .await
3168 .expect("reconciliation should succeed");
3169 assert_eq!(
3170 pre_loss.len(),
3171 1,
3172 "Block should be reconcilable with 2/3 nodes having it"
3173 );
3174
3175 drop(conn_b);
3178 drop(client_b);
3179 redis_b.stop();
3180 redis_b.start();
3181
3182 assert_eq!(
3184 stream_len(&redis_b.redis_url(), &stream_key),
3185 0,
3186 "Restarted node should have empty stream"
3187 );
3188
3189 adapter_a.release().await.expect("release should succeed");
3191
3192 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3194 assert!(
3195 adapter_b
3196 .acquire_lease_if_free()
3197 .await
3198 .expect("acquire should succeed"),
3199 "New leader should acquire lease"
3200 );
3201
3202 let post_loss = adapter_b
3203 .unreconciled_blocks(1.into())
3204 .await
3205 .expect("reconciliation should succeed");
3206
3207 assert_eq!(
3210 post_loss.len(),
3211 1,
3212 "Repair should recover the block by reproposing from node A to the other nodes"
3213 );
3214 }
3215
3216 #[tokio::test(flavor = "multi_thread")]
3221 async fn has_lease_owner_quorum__expands_lock_to_non_owned_nodes() {
3222 let redis_a = RedisTestServer::spawn();
3224 let redis_b = RedisTestServer::spawn();
3225 let redis_c = RedisTestServer::spawn();
3226 let lease_key = "poa:test:lock-expansion".to_string();
3227 let stream_key = format!("{lease_key}:block:stream");
3228 let redis_urls = vec![
3229 redis_a.redis_url(),
3230 redis_b.redis_url(),
3231 redis_c.redis_url(),
3232 ];
3233
3234 let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3236 {
3238 let client = redis::Client::open(redis_c.redis_url()).expect("client");
3239 let mut conn = client.get_connection().expect("conn");
3240 let _: () = redis::cmd("SET")
3241 .arg(&lease_key)
3242 .arg(&candidate_b.lease_owner_token)
3243 .arg("PX")
3244 .arg(5000u64)
3245 .query(&mut conn)
3246 .expect("set should succeed");
3247 }
3248
3249 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3251 assert!(
3252 adapter_a
3253 .acquire_lease_if_free()
3254 .await
3255 .expect("acquire should succeed"),
3256 "Leader A should acquire quorum (2/3)"
3257 );
3258
3259 let owns_a = read_lease_owner(&redis_a.redis_url(), &lease_key)
3261 == Some(adapter_a.lease_owner_token.clone());
3262 let owns_b = read_lease_owner(&redis_b.redis_url(), &lease_key)
3263 == Some(adapter_a.lease_owner_token.clone());
3264 let owns_c = read_lease_owner(&redis_c.redis_url(), &lease_key)
3265 == Some(adapter_a.lease_owner_token.clone());
3266 assert!(owns_a && owns_b, "A should own nodes A and B");
3267 assert!(!owns_c, "A should NOT own node C (held by B)");
3268
3269 clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
3271 assert!(
3272 read_lease_owner(&redis_c.redis_url(), &lease_key).is_none(),
3273 "Node C should be free after B releases"
3274 );
3275
3276 let has_quorum = adapter_a
3278 .has_lease_owner_quorum()
3279 .await
3280 .expect("quorum check should succeed");
3281 assert!(has_quorum, "A should still have quorum");
3282
3283 let owns_c_after = read_lease_owner(&redis_c.redis_url(), &lease_key)
3285 == Some(adapter_a.lease_owner_token.clone());
3286 assert!(owns_c_after, "Lock expansion should have acquired node C");
3287
3288 let block = poa_block_at_time(1, 100);
3290 adapter_a
3291 .publish_produced_block(&block)
3292 .expect("publish should succeed");
3293
3294 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3295 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3296 assert_eq!(
3297 stream_len(&redis_c.redis_url(), &stream_key),
3298 1,
3299 "Block should be written to expanded node C"
3300 );
3301 }
3302
3303 #[tokio::test(flavor = "multi_thread")]
3307 async fn has_lease_owner_quorum__adopts_higher_epoch_from_expanded_node() {
3308 let redis_a = RedisTestServer::spawn();
3310 let redis_b = RedisTestServer::spawn();
3311 let redis_c = RedisTestServer::spawn();
3312 let lease_key = "poa:test:epoch-adoption".to_string();
3313 let epoch_key = format!("{lease_key}:epoch:token");
3314 let stream_key = format!("{lease_key}:block:stream");
3315 let redis_urls = vec![
3316 redis_a.redis_url(),
3317 redis_b.redis_url(),
3318 redis_c.redis_url(),
3319 ];
3320
3321 let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3324 {
3325 let client = redis::Client::open(redis_c.redis_url()).expect("client");
3326 let mut conn = client.get_connection().expect("conn");
3327 let _: () = redis::cmd("SET")
3329 .arg(&lease_key)
3330 .arg(&candidate_b.lease_owner_token)
3331 .arg("PX")
3332 .arg(5000u64)
3333 .query(&mut conn)
3334 .expect("set should succeed");
3335 let _: u64 = redis::cmd("INCR")
3336 .arg(&epoch_key)
3337 .query(&mut conn)
3338 .expect("incr should succeed");
3339 }
3341 clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
3342
3343 let epoch_c_before = read_epoch(&redis_c.redis_url(), &epoch_key);
3345
3346 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3348 assert!(
3349 adapter_a
3350 .acquire_lease_if_free()
3351 .await
3352 .expect("acquire should succeed"),
3353 );
3354 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
3355 .expect("epoch should be set");
3356
3357 assert!(
3362 epoch_a > epoch_c_before,
3363 "Leader's epoch ({epoch_a}) should be > node C's pre-acquisition epoch ({epoch_c_before})"
3364 );
3365
3366 let block = poa_block_at_time(1, 100);
3368 adapter_a
3369 .publish_produced_block(&block)
3370 .expect("publish should succeed on all 3 nodes");
3371
3372 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3373 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3374 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
3375 }
3376
3377 #[tokio::test(flavor = "multi_thread")]
3381 async fn metrics__poa_metrics_appear_in_encoded_output_after_exercising_all_paths() {
3382 let redis_a = RedisTestServer::spawn();
3384 let redis_b = RedisTestServer::spawn();
3385 let redis_c = RedisTestServer::spawn();
3386 let lease_key = "poa:test:metrics-smoke".to_string();
3387 let stream_key = format!("{lease_key}:block:stream");
3388 let redis_urls = vec![
3389 redis_a.redis_url(),
3390 redis_b.redis_url(),
3391 redis_c.redis_url(),
3392 ];
3393
3394 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3396 assert!(
3397 adapter
3398 .acquire_lease_if_free()
3399 .await
3400 .expect("acquire should succeed"),
3401 "adapter should acquire lease"
3402 );
3403
3404 let block1 = poa_block_at_time(1, 100);
3406 adapter
3407 .publish_produced_block(&block1)
3408 .expect("publish should succeed");
3409
3410 let block1_dup = poa_block_at_time(1, 200);
3412 let dup_result = adapter.publish_produced_block(&block1_dup);
3413 assert!(dup_result.is_err(), "duplicate height should fail");
3414
3415 let old_adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3417 {
3419 let mut epoch = old_adapter.current_epoch_token.lock().expect("lock");
3420 *epoch = Some(1);
3421 }
3422 let _zombie = old_adapter.publish_produced_block(&poa_block_at_time(2, 300));
3423
3424 let orphan = poa_block_at_time(2, 400);
3427 let orphan_data = postcard::to_allocvec(&orphan).expect("serialize orphan");
3428 let client_a = redis::Client::open(redis_a.redis_url()).expect("redis client");
3429 let mut conn_a = client_a.get_connection().expect("redis connection");
3430 let epoch_val =
3431 (*adapter.current_epoch_token.lock().expect("lock")).expect("epoch set");
3432 #[allow(clippy::cast_possible_truncation)]
3433 let epoch_u32 = epoch_val as u32;
3434 append_stream_block(&mut conn_a, &stream_key, 2, &orphan_data, epoch_u32);
3435
3436 let state = adapter
3438 .leader_state(2.into())
3439 .await
3440 .expect("leader_state should succeed");
3441 assert!(
3442 matches!(
3443 state,
3444 LeaderState::UnreconciledBlocks(ref blocks) if !blocks.is_empty()
3445 ),
3446 "Should have unreconciled blocks: {state:?}"
3447 );
3448
3449 let encoded =
3451 fuel_core_metrics::encode_metrics().expect("encode_metrics should succeed");
3452
3453 let poa_lines: Vec<&str> =
3455 encoded.lines().filter(|l| l.contains("poa_")).collect();
3456 for line in &poa_lines {
3457 eprintln!("{line}");
3458 }
3459
3460 let expected_names = [
3463 "poa_leader_epoch",
3464 "poa_is_leader",
3465 "poa_epoch_max_drift",
3466 "poa_stream_trim_headroom",
3467 "poa_write_block_success_total",
3468 "poa_write_block_height_exists_total",
3469 "poa_write_block_fencing_error_total",
3470 "poa_write_block_error_total",
3471 "poa_repair_success_total",
3472 "poa_promotion_success_total",
3473 "poa_promotion_duration_s",
3474 "poa_write_block_duration_s",
3475 "poa_reconciliation_duration_s",
3476 "poa_connection_reset_total",
3477 ];
3478 for name in &expected_names {
3479 assert!(
3480 encoded.contains(name),
3481 "Metric '{name}' missing from /v1/metrics output"
3482 );
3483 }
3484
3485 let non_zero_metrics = [
3491 "poa_leader_epoch",
3492 "poa_is_leader",
3493 "poa_write_block_success_total",
3494 "poa_promotion_success_total",
3495 "poa_repair_success_total",
3496 ];
3497 for name in &non_zero_metrics {
3498 let metric_line = encoded
3499 .lines()
3500 .find(|l| {
3501 l.starts_with(name)
3502 && !l.starts_with(&format!("{name}_"))
3503 && !l.starts_with('#')
3504 })
3505 .unwrap_or_else(|| panic!("No data line for {name}"));
3506 assert!(
3507 !metric_line.ends_with(" 0"),
3508 "Metric '{name}' should be non-zero, got: {metric_line}"
3509 );
3510 }
3511 }
3512
3513 #[tokio::test(flavor = "multi_thread")]
3516 async fn unreconciled_blocks__after_quorum_read_failure_then_backlog_remains_readable()
3517 {
3518 let mut redis_a = RedisTestServer::spawn();
3520 let mut redis_b = RedisTestServer::spawn();
3521 let redis_c = RedisTestServer::spawn();
3522 let lease_key = "poa:test:cursor-restore-quorum".to_string();
3523 let stream_key = format!("{lease_key}:block:stream");
3524 let redis_urls = vec![
3525 redis_a.redis_url(),
3526 redis_b.redis_url(),
3527 redis_c.redis_url(),
3528 ];
3529
3530 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3531 assert!(
3532 adapter
3533 .acquire_lease_if_free()
3534 .await
3535 .expect("acquire should succeed"),
3536 "Should acquire lease"
3537 );
3538
3539 let block = poa_block_at_time(1, 100);
3540 adapter
3541 .publish_produced_block(&block)
3542 .expect("publish should succeed on all 3 nodes");
3543 adapter.release().await.expect("release should succeed");
3544
3545 redis_a.stop();
3547 redis_b.stop();
3548
3549 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3550 {
3551 let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3552 *epoch = Some(99);
3553 }
3554 let result = adapter_b.unreconciled_blocks(1.into()).await;
3555 assert!(result.is_err(), "Should fail with quorum read failure");
3556
3557 redis_a.start();
3559 redis_b.start();
3560
3561 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3563 let mut conn_a = client_a.get_connection().expect("conn");
3564 let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3565 let mut conn_b = client_b.get_connection().expect("conn");
3566 let block_data = postcard::to_allocvec(&block).expect("serialize");
3567 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3568 append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 1);
3569
3570 let blocks = adapter_b
3572 .unreconciled_blocks(1.into())
3573 .await
3574 .expect("reconciliation should succeed now");
3575 assert_eq!(
3576 blocks.len(),
3577 1,
3578 "Quorum read failure should not make backlog entries unreadable"
3579 );
3580 }
3581
3582 #[tokio::test(flavor = "multi_thread")]
3585 async fn unreconciled_blocks__after_repair_failure_then_backlog_remains_readable() {
3586 let redis_a = RedisTestServer::spawn();
3588 let redis_b = RedisTestServer::spawn();
3589 let redis_c = RedisTestServer::spawn();
3590 let lease_key = "poa:test:cursor-restore-repair".to_string();
3591 let stream_key = format!("{lease_key}:block:stream");
3592 let redis_urls = vec![
3593 redis_a.redis_url(),
3594 redis_b.redis_url(),
3595 redis_c.redis_url(),
3596 ];
3597
3598 let block = poa_block_at_time(1, 100);
3599 let block_data = postcard::to_allocvec(&block).expect("serialize");
3600
3601 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3603 let mut conn_a = client_a.get_connection().expect("conn");
3604 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3605
3606 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3608 {
3611 let mut epoch = adapter.current_epoch_token.lock().expect("lock");
3612 *epoch = Some(99);
3613 }
3614
3615 let result = adapter.unreconciled_blocks(1.into()).await;
3617 assert!(
3619 result.is_err(),
3620 "Should return error when repair fails and backlog remains unresolved"
3621 );
3622
3623 assert!(
3625 adapter
3626 .acquire_lease_if_free()
3627 .await
3628 .expect("acquire should succeed"),
3629 "Should acquire lease"
3630 );
3631
3632 let blocks = adapter
3634 .unreconciled_blocks(1.into())
3635 .await
3636 .expect("reconciliation should succeed with lock held");
3637 assert_eq!(
3638 blocks.len(),
3639 1,
3640 "Repair failure should not make backlog unreadable on the next round"
3641 );
3642 }
3643}