1use crate::{
2 fuel_core_graphql_api::ports::ConsensusModulePort,
3 service::adapters::{
4 BlockImporterAdapter,
5 BlockProducerAdapter,
6 P2PAdapter,
7 PoAAdapter,
8 TxPoolAdapter,
9 },
10};
11use anyhow::anyhow;
12use fuel_core_importer::ports::{
13 BlockReconciliationWritePort,
14 ImporterDatabase,
15};
16use fuel_core_metrics::poa_metrics::poa_metrics;
17use fuel_core_poa::{
18 ports::{
19 BlockImporter,
20 BlockReconciliationReadPort,
21 LeaderState,
22 P2pPort,
23 PredefinedBlocks,
24 TransactionPool,
25 TransactionsSource,
26 },
27 service::{
28 Mode,
29 SharedState,
30 },
31};
32use fuel_core_services::stream::BoxStream;
33use fuel_core_storage::transactional::Changes;
34use fuel_core_types::{
35 blockchain::{
36 SealedBlock,
37 block::Block,
38 primitives::BlockId,
39 },
40 fuel_types::BlockHeight,
41 services::{
42 block_importer::{
43 BlockImportInfo,
44 UncommittedResult as UncommittedImporterResult,
45 },
46 executor::UncommittedResult,
47 },
48 tai64::Tai64,
49};
50use std::{
51 collections::HashMap,
52 path::{
53 Path,
54 PathBuf,
55 },
56 time::Duration,
57};
58use tokio::{
59 sync::{
60 Mutex,
61 watch,
62 },
63 time::{
64 Instant,
65 sleep,
66 timeout,
67 },
68};
69use tokio_stream::{
70 StreamExt,
71 wrappers::BroadcastStream,
72};
73use tracing::error;
74
75pub mod pre_confirmation_signature;
76
77const CHECK_LEASE_OWNER_SCRIPT: &str = include_str!(concat!(
78 env!("CARGO_MANIFEST_DIR"),
79 "/redis_leader_lease_adapter_scripts/check_lease_owner.lua"
80));
81
82const RELEASE_LOCK_SCRIPT: &str = include_str!(concat!(
83 env!("CARGO_MANIFEST_DIR"),
84 "/redis_leader_lease_adapter_scripts/release_lock.lua"
85));
86
87const PROMOTE_LEADER_SCRIPT: &str = include_str!(concat!(
88 env!("CARGO_MANIFEST_DIR"),
89 "/redis_leader_lease_adapter_scripts/promote_leader.lua"
90));
91
92const WRITE_BLOCK_SCRIPT: &str = include_str!(concat!(
93 env!("CARGO_MANIFEST_DIR"),
94 "/redis_leader_lease_adapter_scripts/write_block.lua"
95));
96
97const READ_STREAM_ENTRIES_SCRIPT: &str = include_str!(concat!(
98 env!("CARGO_MANIFEST_DIR"),
99 "/redis_leader_lease_adapter_scripts/read_stream_entries.lua"
100));
101
102const READ_LATEST_STREAM_ENTRY_SCRIPT: &str = include_str!(concat!(
103 env!("CARGO_MANIFEST_DIR"),
104 "/redis_leader_lease_adapter_scripts/read_latest_stream_entry.lua"
105));
106
107struct RedisNode {
108 redis_client: redis::Client,
109 cached_connection: Mutex<Option<redis::aio::MultiplexedConnection>>,
110}
111
112impl Clone for RedisNode {
113 fn clone(&self) -> Self {
114 Self {
115 redis_client: self.redis_client.clone(),
116 cached_connection: Mutex::new(None),
117 }
118 }
119}
120
121pub struct RedisLeaderLeaseAdapter {
122 redis_nodes: Vec<RedisNode>,
123 quorum: usize,
124 quorum_disruption_budget: u32,
125 lease_key: String,
126 epoch_key: String,
127 block_stream_key: String,
128 lease_owner_token: String,
129 drop_release_guard: std::sync::Arc<()>,
130 current_epoch_token: std::sync::Arc<std::sync::Mutex<Option<u64>>>,
131 lease_ttl_millis: u64,
132 lease_drift_millis: u64,
133 node_timeout: Duration,
134 retry_delay_millis: u64,
135 max_retry_delay_offset_millis: u64,
136 max_attempts: usize,
137 stream_max_len: u32,
138}
139
140impl Clone for RedisLeaderLeaseAdapter {
141 fn clone(&self) -> Self {
142 Self {
143 redis_nodes: self.redis_nodes.clone(),
144 quorum: self.quorum,
145 quorum_disruption_budget: self.quorum_disruption_budget,
146 lease_key: self.lease_key.clone(),
147 epoch_key: self.epoch_key.clone(),
148 block_stream_key: self.block_stream_key.clone(),
149 lease_owner_token: self.lease_owner_token.clone(),
150 drop_release_guard: self.drop_release_guard.clone(),
151 current_epoch_token: self.current_epoch_token.clone(),
152 lease_ttl_millis: self.lease_ttl_millis,
153 lease_drift_millis: self.lease_drift_millis,
154 node_timeout: self.node_timeout,
155 retry_delay_millis: self.retry_delay_millis,
156 max_retry_delay_offset_millis: self.max_retry_delay_offset_millis,
157 max_attempts: self.max_attempts,
158 stream_max_len: self.stream_max_len,
159 }
160 }
161}
162
163#[derive(Default, Clone)]
164pub struct NoopReconciliationAdapter;
165
166#[allow(clippy::large_enum_variant)]
167pub enum ReconciliationAdapter {
168 Redis(RedisLeaderLeaseAdapter),
169 Noop(NoopReconciliationAdapter),
170}
171
172impl RedisLeaderLeaseAdapter {
173 fn calculate_quorum(redis_nodes_len: usize, quorum_disruption_budget: u32) -> usize {
174 let majority = redis_nodes_len
175 .checked_div(2)
176 .unwrap_or(0)
177 .saturating_add(1);
178 let disruption_budget = usize::try_from(quorum_disruption_budget).unwrap_or(0);
179 majority
180 .saturating_add(disruption_budget)
181 .min(redis_nodes_len)
182 }
183
184 #[allow(clippy::too_many_arguments)]
185 pub fn new(
186 redis_urls: Vec<String>,
187 lease_key: String,
188 lease_ttl: Duration,
189 node_timeout: Duration,
190 retry_delay: Duration,
191 max_retry_delay_offset: Duration,
192 max_attempts: u32,
193 stream_max_len: u32,
194 ) -> anyhow::Result<Self> {
195 let redis_nodes = redis_urls
196 .into_iter()
197 .map(|redis_url| {
198 redis::Client::open(redis_url).map(|redis_client| RedisNode {
199 redis_client,
200 cached_connection: Mutex::new(None),
201 })
202 })
203 .collect::<Result<Vec<_>, _>>()?;
204 if redis_nodes.is_empty() {
205 return Err(anyhow!(
206 "At least one redis url is required for leader lock"
207 ));
208 }
209 let quorum_disruption_budget = 0u32;
210 let quorum = Self::calculate_quorum(redis_nodes.len(), quorum_disruption_budget);
211 let lease_ttl_millis = u64::try_from(lease_ttl.as_millis())?;
212 let retry_delay_millis = u64::try_from(retry_delay.as_millis())?;
213 let max_retry_delay_offset_millis =
214 u64::try_from(max_retry_delay_offset.as_millis())?;
215 let max_attempts = usize::try_from(max_attempts)?.max(1);
216 let lease_owner_token = uuid::Uuid::new_v4().to_string();
217 let epoch_key = format!("{lease_key}:epoch:token");
218 let block_stream_key = format!("{lease_key}:block:stream");
219 let lease_drift_millis = lease_ttl_millis
220 .checked_div(100)
221 .unwrap_or(0)
222 .saturating_add(2);
223 Ok(Self {
224 redis_nodes,
225 quorum,
226 quorum_disruption_budget,
227 lease_key,
228 epoch_key,
229 block_stream_key,
230 lease_owner_token,
231 drop_release_guard: std::sync::Arc::new(()),
232 current_epoch_token: std::sync::Arc::new(std::sync::Mutex::new(None)),
233 lease_ttl_millis,
234 lease_drift_millis,
235 node_timeout,
236 retry_delay_millis,
237 max_retry_delay_offset_millis,
238 max_attempts,
239 stream_max_len,
240 })
241 }
242
243 pub fn with_quorum_disruption_budget(
244 mut self,
245 quorum_disruption_budget: u32,
246 ) -> Self {
247 self.quorum_disruption_budget = quorum_disruption_budget;
248 self.quorum =
249 Self::calculate_quorum(self.redis_nodes.len(), quorum_disruption_budget);
250 self
251 }
252
253 async fn multiplexed_connection(
254 &self,
255 redis_node: &RedisNode,
256 ) -> anyhow::Result<redis::aio::MultiplexedConnection> {
257 if let Some(connection) =
258 redis_node.cached_connection.lock().await.as_ref().cloned()
259 {
260 return Ok(connection);
261 }
262
263 let new_connection = timeout(
264 self.node_timeout,
265 redis_node.redis_client.get_multiplexed_async_connection(),
266 )
267 .await
268 .map_err(|_| anyhow!("Timed out while connecting to redis leader-lock node"))??;
269 let mut cached_connection = redis_node.cached_connection.lock().await;
270 if let Some(connection) = cached_connection.as_ref().cloned() {
271 return Ok(connection);
272 }
273 *cached_connection = Some(new_connection.clone());
274 Ok(new_connection)
275 }
276
277 async fn clear_cached_connection(&self, redis_node: &RedisNode) {
278 let mut cached_connection = redis_node.cached_connection.lock().await;
279 *cached_connection = None;
280 poa_metrics().connection_reset_total.inc();
281 }
282
283 async fn check_lease_owner_on_node(&self, redis_node: &RedisNode) -> bool {
284 let mut connection = match self.multiplexed_connection(redis_node).await {
285 Ok(connection) => connection,
286 Err(_) => return false,
287 };
288 let is_owner = timeout(
289 self.node_timeout,
290 redis::Script::new(CHECK_LEASE_OWNER_SCRIPT)
291 .key(&self.lease_key)
292 .arg(&self.lease_owner_token)
293 .invoke_async::<i32>(&mut connection),
294 )
295 .await;
296 match is_owner {
297 Ok(Ok(is_owner)) => is_owner == 1,
298 Err(_) => {
299 self.clear_cached_connection(redis_node).await;
300 false
301 }
302 Ok(Err(_)) => {
303 self.clear_cached_connection(redis_node).await;
304 false
305 }
306 }
307 }
308
309 async fn promote_leader_on_node(
310 &self,
311 redis_node: &RedisNode,
312 ) -> anyhow::Result<Option<u64>> {
313 let mut connection = match self.multiplexed_connection(redis_node).await {
314 Ok(connection) => connection,
315 Err(_) => return Ok(None),
316 };
317 let promoted = timeout(
318 self.node_timeout,
319 redis::Script::new(PROMOTE_LEADER_SCRIPT)
320 .key(&self.lease_key)
321 .key(&self.epoch_key)
322 .arg(&self.lease_owner_token)
323 .arg(self.lease_ttl_millis)
324 .invoke_async::<u64>(&mut connection),
325 )
326 .await;
327 match promoted {
328 Ok(Ok(token)) => Ok(Some(token)),
329 Ok(Err(err)) => {
330 if err.to_string().contains("LOCK_HELD:") {
331 return Ok(None);
332 }
333 self.clear_cached_connection(redis_node).await;
334 Ok(None)
335 }
336 Err(_) => {
337 self.clear_cached_connection(redis_node).await;
338 Ok(None)
339 }
340 }
341 }
342
343 async fn release_lease_on_node(&self, redis_node: &RedisNode) -> bool {
344 let mut connection = match self.multiplexed_connection(redis_node).await {
345 Ok(connection) => connection,
346 Err(_) => return false,
347 };
348 let released = timeout(
349 self.node_timeout,
350 redis::Script::new(RELEASE_LOCK_SCRIPT)
351 .key(&self.lease_key)
352 .arg(&self.lease_owner_token)
353 .invoke_async::<i32>(&mut connection),
354 )
355 .await;
356 match released {
357 Ok(Ok(released)) => released == 1,
358 Err(_) => {
359 self.clear_cached_connection(redis_node).await;
360 false
361 }
362 Ok(Err(_)) => {
363 self.clear_cached_connection(redis_node).await;
364 false
365 }
366 }
367 }
368
369 fn quorum_reached(&self, success_count: usize) -> bool {
370 success_count >= self.quorum
371 }
372
373 fn calculate_remaining_validity_millis(&self, elapsed_millis: u64) -> u64 {
374 self.lease_ttl_millis
375 .saturating_sub(elapsed_millis.saturating_add(self.lease_drift_millis))
376 }
377
378 fn random_retry_delay_offset_millis(&self) -> u64 {
379 if self.max_retry_delay_offset_millis == 0 {
380 return 0;
381 }
382 rand::random::<u64>()
383 .checked_rem(self.max_retry_delay_offset_millis.saturating_add(1))
384 .unwrap_or(0)
385 }
386
387 async fn release_lease_on_all_nodes(&self) {
388 let _ = futures::future::join_all(
389 self.redis_nodes
390 .iter()
391 .map(|redis_node| self.release_lease_on_node(redis_node)),
392 )
393 .await;
394 }
395
396 async fn delay_next_retry(&self) {
397 let retry_delay_millis = self
398 .retry_delay_millis
399 .saturating_add(self.random_retry_delay_offset_millis());
400 sleep(Duration::from_millis(retry_delay_millis)).await;
401 }
402
403 async fn has_lease_owner_quorum(&self) -> anyhow::Result<bool> {
404 let ownership = futures::future::join_all(
405 self.redis_nodes
406 .iter()
407 .map(|redis_node| self.check_lease_owner_on_node(redis_node)),
408 )
409 .await;
410 let owner_count = ownership.iter().filter(|&&is_owner| is_owner).count();
411 if !self.quorum_reached(owner_count) {
412 return Ok(false);
413 }
414
415 let non_owned: Vec<&RedisNode> = self
422 .redis_nodes
423 .iter()
424 .zip(ownership.iter())
425 .filter(|(_, is_owner)| !**is_owner)
426 .map(|(node, _)| node)
427 .collect();
428
429 if !non_owned.is_empty() {
430 let results = futures::future::join_all(
431 non_owned
432 .into_iter()
433 .map(|redis_node| self.promote_leader_on_node(redis_node)),
434 )
435 .await;
436
437 if let Some(max_new) =
438 results.into_iter().filter_map(|r| r.ok().flatten()).max()
439 && let Ok(mut epoch) = self.current_epoch_token.lock()
440 {
441 let current = epoch.unwrap_or(0);
442 if max_new > current {
443 tracing::debug!(
444 old_epoch = current,
445 new_epoch = max_new,
446 "Adopted higher epoch from lock expansion"
447 );
448 *epoch = Some(max_new);
449 }
450 }
451 }
452
453 Ok(true)
454 }
455
456 async fn acquire_lease_if_free(&self) -> anyhow::Result<bool> {
457 let promotion_start = std::time::Instant::now();
458 for attempt_index in 0..self.max_attempts {
459 let start = std::time::Instant::now();
460 let promoted_nodes = futures::future::join_all(
461 self.redis_nodes
462 .iter()
463 .map(|redis_node| self.promote_leader_on_node(redis_node)),
464 )
465 .await;
466 let promoted_tokens = promoted_nodes
467 .into_iter()
468 .filter_map(|token| token.ok().flatten())
469 .collect::<Vec<_>>();
470 let acquired_count = promoted_tokens.len();
471 let elapsed_millis =
472 u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
473 let validity_millis =
474 self.calculate_remaining_validity_millis(elapsed_millis);
475 if self.quorum_reached(acquired_count) && validity_millis > 0 {
476 if promoted_tokens.len() > 1
478 && let (Some(min_tok), Some(max_tok)) = (
479 promoted_tokens.iter().copied().min(),
480 promoted_tokens.iter().copied().max(),
481 )
482 {
483 poa_metrics().epoch_max_drift.set(
484 i64::try_from(max_tok.saturating_sub(min_tok))
485 .unwrap_or(i64::MAX),
486 );
487 }
488 if let Some(max_token) = promoted_tokens.into_iter().max() {
489 let mut current_epoch_token = self
490 .current_epoch_token
491 .lock()
492 .map_err(|e| anyhow!("epoch token lock poisoned: {}", e))?;
493 *current_epoch_token = Some(max_token);
494 poa_metrics()
495 .leader_epoch
496 .set(i64::try_from(max_token).unwrap_or(i64::MAX));
497 }
498 poa_metrics().promotion_success_total.inc();
499 poa_metrics()
500 .promotion_duration_s
501 .observe(promotion_start.elapsed().as_secs_f64());
502 return Ok(true);
503 }
504 self.release_lease_on_all_nodes().await;
505 let is_last_attempt = attempt_index.saturating_add(1) == self.max_attempts;
506 if !is_last_attempt {
507 self.delay_next_retry().await;
508 }
509 }
510 poa_metrics().promotion_failure_total.inc();
511 poa_metrics()
512 .promotion_duration_s
513 .observe(promotion_start.elapsed().as_secs_f64());
514 Ok(false)
515 }
516
517 async fn release_lease_on_client(
518 redis_client: redis::Client,
519 lease_key: String,
520 lease_owner_token: String,
521 node_timeout: Duration,
522 ) {
523 let connection = timeout(
524 node_timeout,
525 redis_client.get_multiplexed_async_connection(),
526 )
527 .await;
528 let mut connection = match connection {
529 Ok(Ok(connection)) => connection,
530 Err(_) => return,
531 Ok(Err(_)) => return,
532 };
533 let _ = timeout(
534 node_timeout,
535 redis::Script::new(RELEASE_LOCK_SCRIPT)
536 .key(lease_key)
537 .arg(lease_owner_token)
538 .invoke_async::<i32>(&mut connection),
539 )
540 .await;
541 }
542
543 async fn release_lease_on_clients(
544 redis_clients: Vec<redis::Client>,
545 lease_key: String,
546 lease_owner_token: String,
547 node_timeout: Duration,
548 ) {
549 let _ =
550 futures::future::join_all(redis_clients.into_iter().map(|redis_client| {
551 Self::release_lease_on_client(
552 redis_client,
553 lease_key.clone(),
554 lease_owner_token.clone(),
555 node_timeout,
556 )
557 }))
558 .await;
559 }
560
561 fn release_lease_on_clients_sync(
562 redis_clients: Vec<redis::Client>,
563 lease_key: String,
564 lease_owner_token: String,
565 ) {
566 redis_clients.into_iter().for_each(|redis_client| {
567 let Ok(mut connection) = redis_client.get_connection() else {
568 return;
569 };
570 let _ = redis::Script::new(RELEASE_LOCK_SCRIPT)
571 .key(&lease_key)
572 .arg(&lease_owner_token)
573 .invoke::<i32>(&mut connection);
574 });
575 }
576
577 async fn read_latest_stream_entry_on_node(
578 &self,
579 redis_node: &RedisNode,
580 ) -> anyhow::Result<Option<(u32, String)>> {
581 let mut connection = self.multiplexed_connection(redis_node).await?;
582 let latest_entry = timeout(
583 self.node_timeout,
584 redis::Script::new(READ_LATEST_STREAM_ENTRY_SCRIPT)
585 .key(&self.block_stream_key)
586 .invoke_async::<Vec<String>>(&mut connection),
587 )
588 .await;
589 match latest_entry {
590 Err(_) => {
591 self.clear_cached_connection(redis_node).await;
592 Err(anyhow!(
593 "Timed out reading latest stream entry from Redis node"
594 ))
595 }
596 Ok(Err(e)) => {
597 self.clear_cached_connection(redis_node).await;
598 Err(anyhow!(
599 "Failed to read latest stream entry from Redis node: {e}"
600 ))
601 }
602 Ok(Ok(entry)) => {
603 if entry.len() != 2 {
604 return Ok(None);
605 }
606 let height = entry[0]
607 .parse::<u32>()
608 .map_err(|e| anyhow!("Invalid latest stream entry height: {e}"))?;
609 Ok(Some((height, entry[1].clone())))
610 }
611 }
612 }
613
614 async fn should_reconcile_from_stream(
615 &self,
616 next_height: BlockHeight,
617 ) -> anyhow::Result<bool> {
618 let next_height = u32::from(next_height);
619 let latest_results = futures::future::join_all(
620 self.redis_nodes
621 .iter()
622 .map(|redis_node| self.read_latest_stream_entry_on_node(redis_node)),
623 )
624 .await;
625 let mut successful_reads = 0usize;
626 let mut failed_count = 0usize;
627 let mut nodes_indicating_backlog = 0usize;
628 for result in latest_results {
629 match result {
630 Ok(Some((latest_height, _latest_stream_id))) => {
631 successful_reads = successful_reads.saturating_add(1);
632 if latest_height >= next_height {
633 nodes_indicating_backlog =
634 nodes_indicating_backlog.saturating_add(1);
635 }
636 }
637 Ok(None) => {
638 successful_reads = successful_reads.saturating_add(1);
639 }
640 Err(e) => {
641 tracing::warn!("Redis latest stream read failed: {e}");
642 failed_count = failed_count.saturating_add(1);
643 }
644 }
645 }
646 if !self.quorum_reached(successful_reads) {
647 return Err(anyhow!(
648 "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
649 successful_reads,
650 self.redis_nodes.len(),
651 failed_count
652 ));
653 }
654 Ok(nodes_indicating_backlog > 0)
655 }
656
657 async fn read_stream_entries_on_node(
658 &self,
659 redis_node: &RedisNode,
660 next_height: u32,
661 max_entries: usize,
662 ) -> anyhow::Result<Vec<(u32, u64, SealedBlock)>> {
663 if max_entries == 0 {
664 return Ok(Vec::new());
665 }
666
667 let mut connection = self.multiplexed_connection(redis_node).await?;
668 let count = u32::try_from(max_entries).unwrap_or(u32::MAX);
669 let stream_entries = timeout(
670 self.node_timeout,
671 redis::Script::new(READ_STREAM_ENTRIES_SCRIPT)
672 .key(&self.block_stream_key)
673 .arg(next_height)
674 .arg(count)
675 .invoke_async::<Vec<(u32, u64, Vec<u8>, String)>>(&mut connection),
676 )
677 .await;
678
679 let entries = match stream_entries {
680 Err(_) => {
681 self.clear_cached_connection(redis_node).await;
682 return Err(anyhow!("Timed out reading stream entries from Redis node"));
683 }
684 Ok(Err(e)) => {
685 self.clear_cached_connection(redis_node).await;
686 return Err(anyhow!(
687 "Failed to read stream entries from Redis node: {e}"
688 ));
689 }
690 Ok(Ok(entries)) => entries,
691 };
692
693 let mut blocks = Vec::new();
694 for (height, epoch, bytes, _stream_id) in entries {
695 match postcard::from_bytes::<SealedBlock>(&bytes) {
696 Ok(block) => blocks.push((height, epoch, block)),
697 Err(e) => {
698 tracing::warn!(
699 "Skipping stream entry: failed to deserialize block at height {height}: {e}"
700 );
701 }
702 }
703 }
704
705 Ok(blocks)
706 }
707
708 async fn unreconciled_blocks(
709 &self,
710 next_height: BlockHeight,
711 ) -> anyhow::Result<Vec<SealedBlock>> {
712 if !self.should_reconcile_from_stream(next_height).await? {
713 return Ok(Vec::new());
714 }
715 let mut reconciled = Vec::new();
716 let max_reconcile_blocks_per_round =
717 usize::try_from(self.stream_max_len).unwrap_or(usize::MAX);
718 let next_height_u32 = u32::from(next_height);
719 let read_results =
720 futures::future::join_all(self.redis_nodes.iter().map(|redis_node| {
721 self.read_stream_entries_on_node(
722 redis_node,
723 next_height_u32,
724 max_reconcile_blocks_per_round,
725 )
726 }))
727 .await;
728
729 let mut successful_reads = Vec::new();
730 let mut failed_count = 0usize;
731 for result in read_results {
732 match result {
733 Ok(entries) => successful_reads.push(entries),
734 Err(e) => {
735 tracing::warn!("Redis stream read failed: {e}");
736 failed_count = failed_count.saturating_add(1);
737 }
738 }
739 }
740
741 if !self.quorum_reached(successful_reads.len()) {
742 return Err(anyhow!(
743 "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
744 successful_reads.len(),
745 self.redis_nodes.len(),
746 failed_count
747 ));
748 }
749
750 let blocks_by_node = successful_reads
751 .into_iter()
752 .map(|entries| {
753 entries.into_iter().fold(
754 HashMap::<u32, HashMap<u64, SealedBlock>>::new(),
755 |mut blocks_by_height, (height, epoch, block)| {
756 blocks_by_height
757 .entry(height)
758 .or_default()
759 .insert(epoch, block);
760 blocks_by_height
761 },
762 )
763 })
764 .collect::<Vec<_>>();
765
766 let min_stream_height = blocks_by_node
768 .iter()
769 .flat_map(|blocks_by_height| blocks_by_height.keys().copied())
770 .min();
771 if let Some(min_h) = min_stream_height {
772 let local_committed = i64::from(u32::from(next_height).saturating_sub(1));
773 let headroom = i64::from(min_h).saturating_sub(local_committed);
774 poa_metrics().stream_trim_headroom.set(headroom);
775 }
776
777 let mut current_height = u32::from(next_height);
778
779 for _ in 0..max_reconcile_blocks_per_round {
780 let nodes_with_height = blocks_by_node
781 .iter()
782 .filter(|blocks_by_height| blocks_by_height.contains_key(¤t_height))
783 .count();
784
785 tracing::debug!(
786 "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}",
787 blocks_by_node.len()
788 );
789
790 if nodes_with_height == 0 {
791 if reconciled.is_empty() {
792 return Err(anyhow!(
793 "Backlog unresolved at height {current_height}: \
794 stream indicates backlog but no entries found at next height"
795 ));
796 }
797 break;
798 }
799
800 let votes = blocks_by_node
801 .iter()
802 .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height))
803 .flat_map(|blocks_by_epoch| blocks_by_epoch.iter())
804 .fold(
805 HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(),
806 |mut votes, (epoch, block)| {
807 let vote_key = (*epoch, block.entity.id());
808 match votes.get_mut(&vote_key) {
809 Some((count, _)) => {
810 *count = count.saturating_add(1);
811 }
812 None => {
813 votes.insert(vote_key, (1, block.clone()));
814 }
815 }
816 votes
817 },
818 );
819
820 let winner = votes
821 .into_iter()
822 .max_by_key(|((epoch, _), _)| *epoch)
823 .map(|(_, (count, block))| (count, block));
824
825 if let Some((count, block)) = winner {
826 if self.quorum_reached(count) {
827 reconciled.push(block);
829 } else {
830 tracing::info!(
835 "Repairing sub-quorum block at height {current_height} \
836 (found on {count}/{} nodes)",
837 blocks_by_node.len()
838 );
839 match self.repair_sub_quorum_block(&block, count) {
840 Ok(true) => {
841 tracing::info!(
842 "Repair succeeded — block at height {current_height} \
843 now has quorum"
844 );
845 reconciled.push(block);
846 }
847 Ok(false) => {
848 tracing::warn!(
849 "Repair failed to reach quorum at height \
850 {current_height} — will retry next round"
851 );
852 if reconciled.is_empty() {
853 return Err(anyhow!(
854 "Backlog unresolved at height {current_height}: \
855 repair failed to reach quorum"
856 ));
857 }
858 break;
859 }
860 Err(e) => {
861 tracing::warn!(
862 "Repair error at height {current_height}: {e}"
863 );
864 if reconciled.is_empty() {
865 return Err(anyhow!(
866 "Backlog unresolved at height {current_height}: \
867 repair error: {e}"
868 ));
869 }
870 break;
871 }
872 }
873 }
874 } else {
875 if reconciled.is_empty() {
876 return Err(anyhow!(
877 "Backlog unresolved at height {current_height}: \
878 no winning block candidate"
879 ));
880 }
881 break;
882 }
883
884 let Some(next) = current_height.checked_add(1) else {
885 break;
886 };
887 current_height = next;
888 }
889
890 Ok(reconciled)
891 }
892
893 async fn can_produce_block(&self) -> anyhow::Result<bool> {
894 tracing::debug!("Checking Redis leader lock");
895 if self.has_lease_owner_quorum().await? {
896 return Ok(true);
897 }
898 self.acquire_lease_if_free().await
899 }
900
901 async fn release_if_owner(&self) -> anyhow::Result<()> {
902 tracing::debug!("Releasing Redis leader lock");
903 if !self.has_lease_owner_quorum().await? {
904 let mut current_epoch_token = self
905 .current_epoch_token
906 .lock()
907 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
908 *current_epoch_token = None;
909 return Ok(());
910 }
911
912 let releases = futures::future::join_all(
913 self.redis_nodes
914 .iter()
915 .map(|redis_node| self.release_lease_on_node(redis_node)),
916 )
917 .await;
918 let released_count = releases.into_iter().filter(|released| *released).count();
919 if self.quorum_reached(released_count) {
920 let mut current_epoch_token = self
921 .current_epoch_token
922 .lock()
923 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
924 *current_epoch_token = None;
925 Ok(())
926 } else {
927 Err(anyhow!("Failed to release lease on quorum"))
928 }
929 }
930
931 fn publish_block_on_node(
932 &self,
933 redis_node: &RedisNode,
934 epoch: u64,
935 block: &SealedBlock,
936 block_data: &[u8],
937 ) -> anyhow::Result<WriteBlockResult> {
938 let mut connection = redis_node
939 .redis_client
940 .get_connection_with_timeout(self.node_timeout)?;
941 connection.set_read_timeout(Some(self.node_timeout))?;
942 connection.set_write_timeout(Some(self.node_timeout))?;
943 let block_height = u32::from(*block.entity.header().height());
944 let lua_start = std::time::Instant::now();
945 let write_result = redis::Script::new(WRITE_BLOCK_SCRIPT)
946 .key(&self.block_stream_key)
947 .key(&self.epoch_key)
948 .key(&self.lease_key)
949 .arg(epoch)
950 .arg(&self.lease_owner_token)
951 .arg(block_height)
952 .arg(block_data)
953 .arg(self.lease_ttl_millis)
954 .arg(self.stream_max_len)
955 .invoke::<String>(&mut connection);
956 poa_metrics()
957 .write_block_duration_s
958 .observe(lua_start.elapsed().as_secs_f64());
959 match write_result {
960 Ok(_) => {
961 poa_metrics().write_block_success_total.inc();
962 Ok(WriteBlockResult::Written)
963 }
964 Err(err) if err.to_string().contains("HEIGHT_EXISTS:") => {
965 poa_metrics().write_block_height_exists_total.inc();
966 tracing::debug!(
967 "write_block: height already exists (height={block_height})"
968 );
969 Ok(WriteBlockResult::HeightExists)
970 }
971 Err(err) if err.to_string().contains("FENCING_ERROR:") => {
972 poa_metrics().write_block_fencing_error_total.inc();
973 tracing::warn!(
974 "write_block: fencing rejected (height={block_height}): {err}"
975 );
976 Ok(WriteBlockResult::FencingRejected)
977 }
978 Err(err) => {
979 poa_metrics().write_block_error_total.inc();
980 Err(err.into())
981 }
982 }
983 }
984
985 fn repair_sub_quorum_block(
1000 &self,
1001 block: &SealedBlock,
1002 pre_existing_count: usize,
1003 ) -> anyhow::Result<bool> {
1004 let epoch = match *self
1005 .current_epoch_token
1006 .lock()
1007 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1008 {
1009 Some(epoch) => epoch,
1010 None => {
1011 return Err(anyhow!(
1012 "Cannot repair block because fencing token is not initialized"
1013 ));
1014 }
1015 };
1016 let block_data = postcard::to_allocvec(block)?;
1017 let mut total_with_block = pre_existing_count;
1023 for redis_node in &self.redis_nodes {
1024 match self.publish_block_on_node(redis_node, epoch, block, &block_data) {
1025 Ok(WriteBlockResult::Written) => {
1026 total_with_block = total_with_block.saturating_add(1);
1027 }
1028 Ok(WriteBlockResult::HeightExists) => {
1029 }
1033 Ok(WriteBlockResult::FencingRejected) => {
1034 return Err(anyhow!(
1036 "Lost lock during repair — another leader took over"
1037 ));
1038 }
1039 Err(err) => {
1040 tracing::debug!("Repair write to node failed: {err}");
1041 }
1042 }
1043 }
1044 let reached_quorum = self.quorum_reached(total_with_block);
1045 if reached_quorum {
1046 poa_metrics().repair_success_total.inc();
1047 } else {
1048 poa_metrics().repair_failure_total.inc();
1049 }
1050 Ok(reached_quorum)
1051 }
1052}
1053
1054enum WriteBlockResult {
1056 Written,
1058 HeightExists,
1060 FencingRejected,
1062}
1063
1064impl PoAAdapter {
1065 pub fn new(shared_state: Option<SharedState>) -> Self {
1066 Self { shared_state }
1067 }
1068
1069 pub async fn manually_produce_blocks(
1070 &self,
1071 start_time: Option<Tai64>,
1072 mode: Mode,
1073 ) -> anyhow::Result<()> {
1074 self.shared_state
1075 .as_ref()
1076 .ok_or(anyhow!("The block production is disabled"))?
1077 .manually_produce_block(start_time, mode)
1078 .await
1079 }
1080}
1081
1082#[async_trait::async_trait]
1083impl BlockReconciliationReadPort for NoopReconciliationAdapter {
1084 async fn leader_state(
1085 &self,
1086 _next_height: BlockHeight,
1087 ) -> anyhow::Result<LeaderState> {
1088 Ok(LeaderState::ReconciledLeader)
1089 }
1090
1091 async fn release(&self) -> anyhow::Result<()> {
1092 Ok(())
1093 }
1094}
1095
1096#[async_trait::async_trait]
1097impl BlockReconciliationReadPort for RedisLeaderLeaseAdapter {
1098 async fn leader_state(
1099 &self,
1100 next_height: BlockHeight,
1101 ) -> anyhow::Result<LeaderState> {
1102 if self.can_produce_block().await? {
1103 poa_metrics().is_leader.set(1);
1104 if let Ok(epoch) = self.current_epoch_token.lock()
1105 && let Some(epoch) = *epoch
1106 {
1107 poa_metrics()
1108 .leader_epoch
1109 .set(i64::try_from(epoch).unwrap_or(i64::MAX));
1110 }
1111 let reconcile_start = std::time::Instant::now();
1112 let unreconciled_blocks = self.unreconciled_blocks(next_height).await?;
1113 poa_metrics()
1114 .reconciliation_duration_s
1115 .observe(reconcile_start.elapsed().as_secs_f64());
1116 if unreconciled_blocks.is_empty() {
1117 Ok(LeaderState::ReconciledLeader)
1118 } else {
1119 Ok(LeaderState::UnreconciledBlocks(unreconciled_blocks))
1120 }
1121 } else {
1122 poa_metrics().is_leader.set(0);
1123 Ok(LeaderState::ReconciledFollower)
1124 }
1125 }
1126
1127 async fn release(&self) -> anyhow::Result<()> {
1128 self.release_if_owner().await
1129 }
1130}
1131
1132#[async_trait::async_trait]
1133impl BlockReconciliationReadPort for ReconciliationAdapter {
1134 async fn leader_state(
1135 &self,
1136 next_height: BlockHeight,
1137 ) -> anyhow::Result<LeaderState> {
1138 match self {
1139 Self::Redis(adapter) => adapter.leader_state(next_height).await,
1140 Self::Noop(adapter) => adapter.leader_state(next_height).await,
1141 }
1142 }
1143
1144 async fn release(&self) -> anyhow::Result<()> {
1145 match self {
1146 Self::Redis(adapter) => adapter.release().await,
1147 Self::Noop(adapter) => adapter.release().await,
1148 }
1149 }
1150}
1151
1152impl Drop for RedisLeaderLeaseAdapter {
1153 fn drop(&mut self) {
1154 if std::sync::Arc::strong_count(&self.drop_release_guard) != 1 {
1155 return;
1156 }
1157
1158 let redis_clients = self
1159 .redis_nodes
1160 .iter()
1161 .map(|redis_node| redis_node.redis_client.clone())
1162 .collect::<Vec<_>>();
1163 if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
1164 let release_future = timeout(
1165 Duration::from_millis(100),
1166 Self::release_lease_on_clients(
1167 redis_clients,
1168 self.lease_key.clone(),
1169 self.lease_owner_token.clone(),
1170 self.node_timeout,
1171 ),
1172 );
1173 drop(runtime_handle.spawn(async move {
1174 if release_future.await.is_err() {
1175 error!("Failed to release leader lease: timeout");
1176 }
1177 }));
1178 return;
1179 }
1180
1181 Self::release_lease_on_clients_sync(
1182 redis_clients,
1183 self.lease_key.clone(),
1184 self.lease_owner_token.clone(),
1185 );
1186 }
1187}
1188
1189impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter {
1190 fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
1191 let epoch = match *self
1192 .current_epoch_token
1193 .lock()
1194 .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1195 {
1196 Some(epoch) => epoch,
1197 None => {
1198 if matches!(
1199 block.consensus,
1200 fuel_core_types::blockchain::consensus::Consensus::Genesis(_)
1201 ) {
1202 tracing::debug!(
1203 "Skipping redis block publish for genesis block because fencing token is not initialized"
1204 );
1205 return Ok(());
1206 }
1207 return Err(anyhow!(
1208 "Cannot publish block because fencing token is not initialized"
1209 ));
1210 }
1211 };
1212 let block_data = postcard::to_allocvec(block)?;
1213 let successes = self
1214 .redis_nodes
1215 .iter()
1216 .map(|redis_node| {
1217 match self.publish_block_on_node(redis_node, epoch, block, &block_data) {
1218 Ok(WriteBlockResult::Written) => true,
1219 Ok(_) => false,
1220 Err(err) => {
1221 tracing::debug!("Redis publish on node failed: {err}");
1222 false
1223 }
1224 }
1225 })
1226 .filter(|success| *success)
1227 .count();
1228 if self.quorum_reached(successes) {
1229 Ok(())
1230 } else {
1231 Err(anyhow!(
1232 "Failed to publish block to redis quorum with fencing checks"
1233 ))
1234 }
1235 }
1236}
1237
1238#[async_trait::async_trait]
1239impl ConsensusModulePort for PoAAdapter {
1240 async fn manually_produce_blocks(
1241 &self,
1242 start_time: Option<Tai64>,
1243 number_of_blocks: u32,
1244 ) -> anyhow::Result<()> {
1245 self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
1246 .await
1247 }
1248}
1249
1250#[cfg(feature = "p2p")]
1251impl P2pPort for P2PAdapter {
1252 fn reserved_peers_count(&self) -> BoxStream<usize> {
1253 if let Some(service) = &self.service {
1254 Box::pin(
1255 BroadcastStream::new(service.subscribe_reserved_peers_count())
1256 .filter_map(|result| result.ok()),
1257 )
1258 } else {
1259 Box::pin(tokio_stream::pending())
1260 }
1261 }
1262}
1263
1264#[cfg(not(feature = "p2p"))]
1265impl P2pPort for P2PAdapter {
1266 fn reserved_peers_count(&self) -> BoxStream<usize> {
1267 Box::pin(tokio_stream::pending())
1268 }
1269}
1270
1271pub struct InDirectoryPredefinedBlocks {
1272 path_to_directory: Option<PathBuf>,
1273}
1274
1275impl InDirectoryPredefinedBlocks {
1276 pub fn new(path_to_directory: Option<PathBuf>) -> Self {
1277 Self { path_to_directory }
1278 }
1279}
1280
1281impl PredefinedBlocks for InDirectoryPredefinedBlocks {
1282 fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
1283 let Some(path) = &self.path_to_directory else {
1284 return Ok(None);
1285 };
1286
1287 let block_height: u32 = (*height).into();
1288 if block_exists(path.as_path(), block_height) {
1289 let block_path = block_path(path.as_path(), block_height);
1290 let block_bytes = std::fs::read(block_path)?;
1291 let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
1292 Ok(Some(block))
1293 } else {
1294 Ok(None)
1295 }
1296 }
1297}
1298
1299pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
1300 path_to_directory.join(format!("{block_height}.json"))
1301}
1302
1303pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
1304 block_path(path_to_directory, block_height).exists()
1305}
1306
1307impl TransactionPool for TxPoolAdapter {
1308 fn new_txs_watcher(&self) -> watch::Receiver<()> {
1309 self.service.get_new_executable_txs_notifier()
1310 }
1311}
1312
1313#[async_trait::async_trait]
1314impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
1315 async fn produce_and_execute_block(
1316 &self,
1317 height: BlockHeight,
1318 block_time: Tai64,
1319 source: TransactionsSource,
1320 deadline: Instant,
1321 ) -> anyhow::Result<UncommittedResult<Changes>> {
1322 match source {
1323 TransactionsSource::TxPool => {
1324 self.block_producer
1325 .produce_and_execute_block_txpool(height, block_time, deadline)
1326 .await
1327 }
1328 TransactionsSource::SpecificTransactions(txs) => {
1329 self.block_producer
1330 .produce_and_execute_block_transactions(height, block_time, txs)
1331 .await
1332 }
1333 }
1334 }
1335
1336 async fn produce_predefined_block(
1337 &self,
1338 block: &Block,
1339 ) -> anyhow::Result<UncommittedResult<Changes>> {
1340 self.block_producer
1341 .produce_and_execute_predefined(block, ())
1342 .await
1343 }
1344}
1345
1346#[async_trait::async_trait]
1347impl BlockImporter for BlockImporterAdapter {
1348 async fn commit_result(
1349 &self,
1350 result: UncommittedImporterResult<Changes>,
1351 ) -> anyhow::Result<()> {
1352 self.block_importer
1353 .commit_result(result)
1354 .await
1355 .map_err(Into::into)
1356 }
1357
1358 async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> {
1359 self.block_importer
1360 .execute_and_commit(block)
1361 .await
1362 .map_err(Into::into)
1363 }
1364
1365 fn block_stream(&self) -> BoxStream<BlockImportInfo> {
1366 Box::pin(
1367 BroadcastStream::new(self.block_importer.subscribe())
1368 .filter_map(|result| result.ok())
1369 .map(|result| BlockImportInfo::from(result.shared_result)),
1370 )
1371 }
1372
1373 fn latest_block_height(&self) -> anyhow::Result<Option<BlockHeight>> {
1374 self.database.latest_block_height().map_err(Into::into)
1375 }
1376}
1377
1378#[cfg(all(test, feature = "leader_lock", not(feature = "not_leader_lock")))]
1379#[allow(non_snake_case)]
1380mod tests {
1381 use super::*;
1382 use fuel_core_importer::ports::BlockReconciliationWritePort;
1383 use fuel_core_poa::ports::BlockReconciliationReadPort;
1384 use fuel_core_types::blockchain::consensus::Consensus;
1385 use std::{
1386 net::{
1387 SocketAddrV4,
1388 TcpListener,
1389 TcpStream,
1390 },
1391 process::{
1392 Child,
1393 Command,
1394 Stdio,
1395 },
1396 thread,
1397 time::Duration,
1398 };
1399
1400 #[tokio::test(flavor = "multi_thread")]
1401 async fn leader_state__when_same_height_has_multiple_stream_entries_then_returns_highest_epoch_block()
1402 {
1403 let redis = RedisTestServer::spawn();
1405 let lease_key = "poa:test:stream-conflict".to_string();
1406 let stream_key = format!("{lease_key}:block:stream");
1407 let adapter = RedisLeaderLeaseAdapter::new(
1408 vec![redis.redis_url()],
1409 lease_key,
1410 Duration::from_secs(2),
1411 Duration::from_millis(100),
1412 Duration::from_millis(50),
1413 Duration::from_millis(0),
1414 1,
1415 1000,
1416 )
1417 .expect("adapter should be created");
1418
1419 let low_epoch_block = poa_block_at_time(1, 10);
1420 let high_epoch_block = poa_block_at_time(1, 20);
1421
1422 let low_epoch_data =
1423 postcard::to_allocvec(&low_epoch_block).expect("serialize block");
1424 let high_epoch_data =
1425 postcard::to_allocvec(&high_epoch_block).expect("serialize block");
1426
1427 let redis_client =
1428 redis::Client::open(redis.redis_url()).expect("redis client should open");
1429 let mut conn = redis_client
1430 .get_connection()
1431 .expect("redis connection should open");
1432 append_stream_block(&mut conn, &stream_key, 1, &low_epoch_data, 1);
1433 append_stream_block(&mut conn, &stream_key, 1, &high_epoch_data, 2);
1434
1435 let leader_state = adapter
1437 .leader_state(1.into())
1438 .await
1439 .expect("leader_state should succeed");
1440
1441 let unreconciled_blocks = match leader_state {
1443 LeaderState::UnreconciledBlocks(blocks) => blocks,
1444 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1445 };
1446 assert_eq!(unreconciled_blocks.len(), 1);
1447 assert_eq!(
1448 unreconciled_blocks[0].entity.header().time(),
1449 high_epoch_block.entity.header().time(),
1450 "Expected reconciliation to pick the highest epoch block for the same height",
1451 );
1452 }
1453
1454 #[tokio::test(flavor = "multi_thread")]
1455 async fn leader_state__when_same_height_same_epoch_has_multiple_stream_entries_then_keeps_latest_entry()
1456 {
1457 let redis = RedisTestServer::spawn();
1459 let lease_key = "poa:test:equal-epoch-latest-entry".to_string();
1460 let stream_key = format!("{lease_key}:block:stream");
1461 let adapter = RedisLeaderLeaseAdapter::new(
1462 vec![redis.redis_url()],
1463 lease_key,
1464 Duration::from_secs(2),
1465 Duration::from_millis(100),
1466 Duration::from_millis(50),
1467 Duration::from_millis(0),
1468 1,
1469 1000,
1470 )
1471 .expect("adapter should be created");
1472
1473 let stale_block = poa_block_at_time(1, 10);
1474 let retry_block = poa_block_at_time(1, 20);
1475 let stale_data =
1476 postcard::to_allocvec(&stale_block).expect("stale block should serialize");
1477 let retry_data =
1478 postcard::to_allocvec(&retry_block).expect("retry block should serialize");
1479
1480 let redis_client =
1481 redis::Client::open(redis.redis_url()).expect("redis client should open");
1482 let mut conn = redis_client
1483 .get_connection()
1484 .expect("redis connection should open");
1485 append_stream_block(&mut conn, &stream_key, 1, &stale_data, 1);
1486 append_stream_block(&mut conn, &stream_key, 1, &retry_data, 1);
1487
1488 let leader_state = adapter
1490 .leader_state(1.into())
1491 .await
1492 .expect("leader_state should succeed");
1493
1494 let unreconciled_blocks = match leader_state {
1496 LeaderState::UnreconciledBlocks(blocks) => blocks,
1497 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1498 };
1499 assert_eq!(unreconciled_blocks.len(), 1);
1500 assert_eq!(
1501 unreconciled_blocks[0].entity.id(),
1502 retry_block.entity.id(),
1503 "Expected reconciliation to keep latest stream entry for equal epoch",
1504 );
1505 }
1506
1507 #[tokio::test(flavor = "multi_thread")]
1508 async fn leader_state__when_height_has_disagreeing_block_ids_then_repairs_with_highest_epoch_block()
1509 {
1510 let redis_a = RedisTestServer::spawn();
1512 let redis_b = RedisTestServer::spawn();
1513 let redis_c = RedisTestServer::spawn();
1514 let lease_key = "poa:test:epoch-quorum-block-mismatch".to_string();
1515 let stream_key = format!("{lease_key}:block:stream");
1516 let adapter = new_test_adapter(
1517 vec![
1518 redis_a.redis_url(),
1519 redis_b.redis_url(),
1520 redis_c.redis_url(),
1521 ],
1522 lease_key,
1523 );
1524 assert!(
1525 adapter
1526 .acquire_lease_if_free()
1527 .await
1528 .expect("acquire should succeed"),
1529 "adapter should acquire lease"
1530 );
1531
1532 let block_a = poa_block_at_time(1, 10);
1533 let block_b = poa_block_at_time(1, 20);
1534 let block_a_data =
1535 postcard::to_allocvec(&block_a).expect("block a should serialize");
1536 let block_b_data =
1537 postcard::to_allocvec(&block_b).expect("block b should serialize");
1538
1539 let redis_a_client =
1540 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1541 let redis_b_client =
1542 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1543 let mut conn_a = redis_a_client
1544 .get_connection()
1545 .expect("redis a connection should open");
1546 let mut conn_b = redis_b_client
1547 .get_connection()
1548 .expect("redis b connection should open");
1549
1550 append_stream_block(&mut conn_a, &stream_key, 1, &block_a_data, 7);
1552 append_stream_block(&mut conn_b, &stream_key, 1, &block_b_data, 7);
1553
1554 let leader_state = adapter
1557 .leader_state(1.into())
1558 .await
1559 .expect("leader_state should succeed");
1560
1561 assert!(
1563 matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1564 "Expected repair to pick one block and reach quorum, got {leader_state:?}",
1565 );
1566 }
1567
1568 #[tokio::test(flavor = "multi_thread")]
1569 async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it()
1570 {
1571 let redis_a = RedisTestServer::spawn();
1573 let redis_b = RedisTestServer::spawn();
1574 let redis_c = RedisTestServer::spawn();
1575 let lease_key = "poa:test:below-quorum".to_string();
1576 let stream_key = format!("{lease_key}:block:stream");
1577 let adapter = new_test_adapter(
1578 vec![
1579 redis_a.redis_url(),
1580 redis_b.redis_url(),
1581 redis_c.redis_url(),
1582 ],
1583 lease_key,
1584 );
1585 assert!(
1586 adapter
1587 .acquire_lease_if_free()
1588 .await
1589 .expect("acquire should succeed"),
1590 "adapter should acquire lease"
1591 );
1592
1593 let orphan_block = poa_block_at_time(1, 10);
1594 let orphan_block_data =
1595 postcard::to_allocvec(&orphan_block).expect("orphan block should serialize");
1596
1597 let redis_client =
1598 redis::Client::open(redis_a.redis_url()).expect("redis client should open");
1599 let mut conn = redis_client
1600 .get_connection()
1601 .expect("redis connection should open");
1602 append_stream_block(&mut conn, &stream_key, 1, &orphan_block_data, 1);
1603
1604 let leader_state = adapter
1606 .leader_state(1.into())
1607 .await
1608 .expect("leader_state should succeed");
1609
1610 assert!(
1612 matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1613 "Expected sub-quorum entry to be repaired and returned, got {leader_state:?}"
1614 );
1615 }
1616
1617 #[tokio::test(flavor = "multi_thread")]
1618 async fn leader_state__when_contiguous_heights_have_quorum_then_repairs_sub_quorum_tail()
1619 {
1620 let redis_a = RedisTestServer::spawn();
1622 let redis_b = RedisTestServer::spawn();
1623 let redis_c = RedisTestServer::spawn();
1624 let lease_key = "poa:test:contiguous-quorum".to_string();
1625 let stream_key = format!("{lease_key}:block:stream");
1626 let adapter = new_test_adapter(
1627 vec![
1628 redis_a.redis_url(),
1629 redis_b.redis_url(),
1630 redis_c.redis_url(),
1631 ],
1632 lease_key,
1633 );
1634 assert!(
1635 adapter
1636 .acquire_lease_if_free()
1637 .await
1638 .expect("acquire should succeed"),
1639 "adapter should acquire lease"
1640 );
1641
1642 let h1 = poa_block_at_time(1, 10);
1643 let h2 = poa_block_at_time(2, 20);
1644 let h3 = poa_block_at_time(3, 30);
1645 let h1_data = postcard::to_allocvec(&h1).expect("h1 should serialize");
1646 let h2_data = postcard::to_allocvec(&h2).expect("h2 should serialize");
1647 let h3_data = postcard::to_allocvec(&h3).expect("h3 should serialize");
1648
1649 let redis_a_client =
1650 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1651 let redis_b_client =
1652 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1653 let mut conn_a = redis_a_client
1654 .get_connection()
1655 .expect("redis a connection should open");
1656 let mut conn_b = redis_b_client
1657 .get_connection()
1658 .expect("redis b connection should open");
1659
1660 append_stream_block(&mut conn_a, &stream_key, 1, &h1_data, 1);
1662 append_stream_block(&mut conn_b, &stream_key, 1, &h1_data, 1);
1663 append_stream_block(&mut conn_a, &stream_key, 2, &h2_data, 1);
1665 append_stream_block(&mut conn_b, &stream_key, 2, &h2_data, 1);
1666 append_stream_block(&mut conn_a, &stream_key, 3, &h3_data, 1);
1668
1669 let leader_state = adapter
1671 .leader_state(1.into())
1672 .await
1673 .expect("leader_state should succeed");
1674
1675 let unreconciled_blocks = match leader_state {
1677 LeaderState::UnreconciledBlocks(blocks) => blocks,
1678 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1679 };
1680 assert_eq!(
1681 unreconciled_blocks
1682 .iter()
1683 .map(|b| u32::from(*b.entity.header().height()))
1684 .collect::<Vec<_>>(),
1685 vec![1, 2, 3],
1686 "Expected all heights including repaired sub-quorum h3",
1687 );
1688 }
1689
1690 #[tokio::test(flavor = "multi_thread")]
1691 async fn leader_state__when_contiguous_quorum_blocks_are_present_then_returns_all_available_contiguous_blocks()
1692 {
1693 let redis_a = RedisTestServer::spawn();
1695 let redis_b = RedisTestServer::spawn();
1696 let redis_c = RedisTestServer::spawn();
1697 let lease_key = "poa:test:contiguous-over-128".to_string();
1698 let stream_key = format!("{lease_key}:block:stream");
1699 let adapter = RedisLeaderLeaseAdapter::new(
1700 vec![
1701 redis_a.redis_url(),
1702 redis_b.redis_url(),
1703 redis_c.redis_url(),
1704 ],
1705 lease_key,
1706 Duration::from_secs(2),
1707 Duration::from_millis(100),
1708 Duration::from_millis(50),
1709 Duration::from_millis(0),
1710 1,
1711 1000,
1712 )
1713 .expect("adapter should be created");
1714 let redis_a_client =
1715 redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1716 let redis_b_client =
1717 redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1718 let redis_c_client =
1719 redis::Client::open(redis_c.redis_url()).expect("redis c client should open");
1720 let mut conn_a = redis_a_client
1721 .get_connection()
1722 .expect("redis a connection should open");
1723 let mut conn_b = redis_b_client
1724 .get_connection()
1725 .expect("redis b connection should open");
1726 let mut conn_c = redis_c_client
1727 .get_connection()
1728 .expect("redis c connection should open");
1729 let _ = &mut conn_c;
1730
1731 (1_u32..=129_u32).for_each(|height| {
1732 let block = poa_block_at_time(height, u64::from(height));
1733 let block_data =
1734 postcard::to_allocvec(&block).expect("block should serialize");
1735 append_stream_block(&mut conn_a, &stream_key, height, &block_data, 1);
1736 append_stream_block(&mut conn_b, &stream_key, height, &block_data, 1);
1737 });
1738
1739 let leader_state = adapter
1741 .leader_state(1.into())
1742 .await
1743 .expect("leader_state should succeed");
1744
1745 let unreconciled_blocks = match leader_state {
1747 LeaderState::UnreconciledBlocks(blocks) => blocks,
1748 other => panic!("Expected unreconciled blocks, got: {other:?}"),
1749 };
1750 assert_eq!(
1751 unreconciled_blocks.len(),
1752 129,
1753 "Expected all contiguous quorum-backed heights to reconcile in one call",
1754 );
1755 }
1756
1757 #[tokio::test(flavor = "multi_thread")]
1758 async fn publish_produced_block__when_fencing_token_is_uninitialized_then_returns_error()
1759 {
1760 let redis_a = RedisTestServer::spawn();
1762 let redis_b = RedisTestServer::spawn();
1763 let redis_c = RedisTestServer::spawn();
1764 let lease_key = "poa:test:missing-epoch".to_string();
1765 let redis_urls = vec![
1766 redis_a.redis_url(),
1767 redis_b.redis_url(),
1768 redis_c.redis_url(),
1769 ];
1770 let adapter = new_test_adapter(redis_urls, lease_key);
1771 let block = poa_block_at_time(1, 100);
1772
1773 let publish_result = adapter.publish_produced_block(&block);
1775
1776 assert!(
1778 publish_result.is_err(),
1779 "Publish should fail when fencing token is not initialized"
1780 );
1781 }
1782
1783 #[tokio::test(flavor = "multi_thread")]
1784 async fn release__when_adapter_is_not_lease_owner_then_returns_ok() {
1785 let redis_a = RedisTestServer::spawn();
1787 let redis_b = RedisTestServer::spawn();
1788 let redis_c = RedisTestServer::spawn();
1789 let lease_key = "poa:test:release-follower".to_string();
1790 let redis_urls = vec![
1791 redis_a.redis_url(),
1792 redis_b.redis_url(),
1793 redis_c.redis_url(),
1794 ];
1795 let adapter = new_test_adapter(redis_urls, lease_key);
1796
1797 let release_result = adapter.release().await;
1799
1800 assert!(
1802 release_result.is_ok(),
1803 "Release should be idempotent for adapters that do not own quorum lease"
1804 );
1805 }
1806
1807 #[tokio::test(flavor = "multi_thread")]
1808 async fn drop__when_non_last_clone_is_dropped_then_does_not_release_shared_lease() {
1809 let redis_a = RedisTestServer::spawn();
1811 let redis_b = RedisTestServer::spawn();
1812 let redis_c = RedisTestServer::spawn();
1813 let lease_key = "poa:test:drop-non-last-clone".to_string();
1814 let redis_urls = vec![
1815 redis_a.redis_url(),
1816 redis_b.redis_url(),
1817 redis_c.redis_url(),
1818 ];
1819 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
1820 assert!(
1821 adapter
1822 .acquire_lease_if_free()
1823 .await
1824 .expect("acquire should succeed"),
1825 "Adapter should acquire lease"
1826 );
1827 let adapter_clone = adapter.clone();
1828 let owner_token = adapter.lease_owner_token.clone();
1829
1830 drop(adapter_clone);
1832 sleep(Duration::from_millis(50)).await;
1833
1834 let owners = redis_urls
1836 .iter()
1837 .filter(|redis_url| {
1838 read_lease_owner(redis_url, &lease_key).as_deref()
1839 == Some(owner_token.as_str())
1840 })
1841 .count();
1842 assert!(
1843 owners >= 2,
1844 "Dropping a non-last clone must not release quorum lease ownership"
1845 );
1846 }
1847
1848 #[tokio::test(flavor = "multi_thread")]
1849 async fn leader_state__when_lease_is_free_then_acquires_quorum_ownership() {
1850 let redis_a = RedisTestServer::spawn();
1852 let redis_b = RedisTestServer::spawn();
1853 let redis_c = RedisTestServer::spawn();
1854 let lease_key = "poa:test:acquire-on-leader-state".to_string();
1855 let redis_urls = vec![
1856 redis_a.redis_url(),
1857 redis_b.redis_url(),
1858 redis_c.redis_url(),
1859 ];
1860 let adapter = RedisLeaderLeaseAdapter::new(
1861 redis_urls.clone(),
1862 lease_key.clone(),
1863 Duration::from_millis(500),
1864 Duration::from_millis(100),
1865 Duration::from_millis(50),
1866 Duration::from_millis(0),
1867 1,
1868 1000,
1869 )
1870 .expect("adapter should be created");
1871
1872 let state = adapter
1874 .leader_state(1.into())
1875 .await
1876 .expect("leader_state should succeed");
1877 let owners = redis_urls
1878 .iter()
1879 .filter(|redis_url| {
1880 read_lease_owner(redis_url, &lease_key).as_deref()
1881 == Some(adapter.lease_owner_token.as_str())
1882 })
1883 .count();
1884
1885 assert!(
1887 matches!(state, LeaderState::ReconciledLeader),
1888 "leader_state should acquire and report leader ownership when lease is free"
1889 );
1890 assert!(
1891 owners >= 2,
1892 "Lease ownership should be present on quorum after acquisition"
1893 );
1894 }
1895
1896 #[tokio::test(flavor = "multi_thread")]
1897 async fn leader_state__when_lease_expires_then_another_adapter_becomes_leader() {
1898 let redis_a = RedisTestServer::spawn();
1900 let redis_b = RedisTestServer::spawn();
1901 let redis_c = RedisTestServer::spawn();
1902 let lease_key = "poa:test:ttl-expiry-handoff".to_string();
1903 let redis_urls = vec![
1904 redis_a.redis_url(),
1905 redis_b.redis_url(),
1906 redis_c.redis_url(),
1907 ];
1908 let first_adapter = RedisLeaderLeaseAdapter::new(
1909 redis_urls.clone(),
1910 lease_key.clone(),
1911 Duration::from_millis(300),
1912 Duration::from_millis(100),
1913 Duration::from_millis(50),
1914 Duration::from_millis(0),
1915 1,
1916 1000,
1917 )
1918 .expect("first adapter should be created");
1919 let second_adapter = RedisLeaderLeaseAdapter::new(
1920 redis_urls.clone(),
1921 lease_key.clone(),
1922 Duration::from_millis(300),
1923 Duration::from_millis(100),
1924 Duration::from_millis(50),
1925 Duration::from_millis(0),
1926 1,
1927 1000,
1928 )
1929 .expect("second adapter should be created");
1930
1931 let first_state = first_adapter
1932 .leader_state(1.into())
1933 .await
1934 .expect("first leader_state should succeed");
1935 sleep(Duration::from_millis(900)).await;
1936
1937 let second_state = second_adapter
1939 .leader_state(1.into())
1940 .await
1941 .expect("second leader_state should succeed");
1942 let second_owner_count = redis_urls
1943 .iter()
1944 .filter(|redis_url| {
1945 read_lease_owner(redis_url, &lease_key).as_deref()
1946 == Some(second_adapter.lease_owner_token.as_str())
1947 })
1948 .count();
1949
1950 assert!(
1952 matches!(first_state, LeaderState::ReconciledLeader),
1953 "First adapter should acquire lease initially"
1954 );
1955 assert!(
1956 matches!(second_state, LeaderState::ReconciledLeader),
1957 "Second adapter should become leader after TTL expiry"
1958 );
1959 assert!(
1960 second_owner_count >= 2,
1961 "Second adapter should own lease on quorum nodes after takeover"
1962 );
1963 }
1964
1965 #[tokio::test(flavor = "multi_thread")]
1966 async fn publish_produced_block__when_previous_leader_writes_after_handoff_then_rejects_zombie_write()
1967 {
1968 let redis_a = RedisTestServer::spawn();
1970 let redis_b = RedisTestServer::spawn();
1971 let redis_c = RedisTestServer::spawn();
1972 let lease_key = "poa:test:zombie-leader".to_string();
1973 let redis_urls = vec![
1974 redis_a.redis_url(),
1975 redis_b.redis_url(),
1976 redis_c.redis_url(),
1977 ];
1978 let old_leader = new_test_adapter(redis_urls.clone(), lease_key.clone());
1979 let current_leader = new_test_adapter(redis_urls, lease_key.clone());
1980 let block = poa_block_at_time(1, 111);
1981
1982 assert!(
1983 old_leader
1984 .acquire_lease_if_free()
1985 .await
1986 .expect("acquire should succeed"),
1987 "Old leader should acquire initial lease"
1988 );
1989 clear_lease_on_nodes(
1990 &[
1991 redis_a.redis_url(),
1992 redis_b.redis_url(),
1993 redis_c.redis_url(),
1994 ],
1995 &lease_key,
1996 );
1997 assert!(
1998 current_leader
1999 .acquire_lease_if_free()
2000 .await
2001 .expect("acquire should succeed"),
2002 "Current leader should acquire lease after handoff"
2003 );
2004
2005 let zombie_write = old_leader.publish_produced_block(&block);
2007
2008 assert!(
2010 zombie_write.is_err(),
2011 "Old leader write should be fenced after handoff"
2012 );
2013 let current_state = current_leader
2014 .leader_state(1.into())
2015 .await
2016 .expect("leader_state should succeed");
2017 assert!(
2018 matches!(current_state, LeaderState::ReconciledLeader),
2019 "Zombie partial writes must not be considered committed"
2020 );
2021 }
2022
2023 #[tokio::test(flavor = "multi_thread")]
2024 async fn publish_produced_block__when_epoch_is_behind_on_one_node_then_first_write_heals_epoch()
2025 {
2026 let redis_a = RedisTestServer::spawn();
2028 let redis_b = RedisTestServer::spawn();
2029 let redis_c = RedisTestServer::spawn();
2030 let lease_key = "poa:test:epoch-healing".to_string();
2031 let redis_urls = vec![
2032 redis_a.redis_url(),
2033 redis_b.redis_url(),
2034 redis_c.redis_url(),
2035 ];
2036 let adapter = new_test_adapter(redis_urls, lease_key.clone());
2037 let epoch_key = format!("{lease_key}:epoch:token");
2038 let block = poa_block_at_time(1, 222);
2039 assert!(
2040 adapter
2041 .acquire_lease_if_free()
2042 .await
2043 .expect("acquire should succeed"),
2044 "Adapter should acquire lease"
2045 );
2046 let leader_epoch = (*adapter.current_epoch_token.lock().expect("poisoned lock"))
2047 .expect("epoch should be initialized");
2048 let stale_epoch = leader_epoch.saturating_sub(1);
2049 set_epoch(&redis_a.redis_url(), &epoch_key, stale_epoch);
2050
2051 let publish_result = adapter.publish_produced_block(&block);
2053
2054 assert!(
2056 publish_result.is_ok(),
2057 "Publish should still succeed on quorum"
2058 );
2059 let healed_epoch = read_epoch(&redis_a.redis_url(), &epoch_key);
2060 assert_eq!(
2061 healed_epoch, leader_epoch,
2062 "First successful write should heal lagging epoch"
2063 );
2064 }
2065
2066 #[tokio::test(flavor = "multi_thread")]
2067 async fn publish_produced_block__when_write_succeeds_then_extends_lease_ttl() {
2068 let redis_a = RedisTestServer::spawn();
2070 let redis_b = RedisTestServer::spawn();
2071 let redis_c = RedisTestServer::spawn();
2072 let lease_key = "poa:test:publish-extends-lease-ttl".to_string();
2073 let redis_urls = vec![
2074 redis_a.redis_url(),
2075 redis_b.redis_url(),
2076 redis_c.redis_url(),
2077 ];
2078 let adapter = RedisLeaderLeaseAdapter::new(
2079 redis_urls.clone(),
2080 lease_key.clone(),
2081 Duration::from_millis(700),
2082 Duration::from_millis(100),
2083 Duration::from_millis(50),
2084 Duration::from_millis(0),
2085 1,
2086 1000,
2087 )
2088 .expect("adapter should be created");
2089 let block = poa_block_at_time(1, 444);
2090 assert!(
2091 adapter
2092 .acquire_lease_if_free()
2093 .await
2094 .expect("acquire should succeed"),
2095 "Adapter should acquire lease"
2096 );
2097 sleep(Duration::from_millis(500)).await;
2098
2099 let publish_result = adapter.publish_produced_block(&block);
2101 sleep(Duration::from_millis(400)).await;
2102 let owners = redis_urls
2103 .iter()
2104 .filter(|redis_url| {
2105 read_lease_owner(redis_url, &lease_key).as_deref()
2106 == Some(adapter.lease_owner_token.as_str())
2107 })
2108 .count();
2109
2110 assert!(publish_result.is_ok(), "Publish should succeed on quorum");
2112 assert!(
2113 owners >= 2,
2114 "Successful write should extend lease TTL on quorum beyond original window"
2115 );
2116 }
2117
2118 #[tokio::test(flavor = "multi_thread")]
2119 async fn publish_produced_block__when_write_succeeds_on_less_than_quorum_then_entry_is_not_reconciled()
2120 {
2121 let redis_a = RedisTestServer::spawn();
2123 let redis_b = RedisTestServer::spawn();
2124 let redis_c = RedisTestServer::spawn();
2125 let lease_key = "poa:test:partial-write".to_string();
2126 let stream_key = format!("{lease_key}:block:stream");
2127 let redis_urls = vec![
2128 redis_a.redis_url(),
2129 redis_b.redis_url(),
2130 redis_c.redis_url(),
2131 ];
2132 let adapter = new_test_adapter(redis_urls, lease_key.clone());
2133 let block = poa_block_at_time(1, 333);
2134 assert!(
2135 adapter
2136 .acquire_lease_if_free()
2137 .await
2138 .expect("acquire should succeed"),
2139 "Adapter should acquire lease"
2140 );
2141 set_lease_owner(
2142 &redis_b.redis_url(),
2143 &lease_key,
2144 "other-owner",
2145 adapter.lease_ttl_millis,
2146 );
2147 set_lease_owner(
2148 &redis_c.redis_url(),
2149 &lease_key,
2150 "other-owner",
2151 adapter.lease_ttl_millis,
2152 );
2153
2154 let publish_result = adapter.publish_produced_block(&block);
2156 let unreconciled = adapter.unreconciled_blocks(1.into()).await;
2157
2158 assert!(
2160 publish_result.is_err(),
2161 "Publish must fail when fewer than quorum nodes accept write"
2162 );
2163 assert!(
2164 unreconciled.is_err(),
2165 "Unresolved backlog should return an error instead of empty result"
2166 );
2167 assert_eq!(
2168 stream_len(&redis_a.redis_url(), &stream_key),
2169 1,
2170 "One orphan entry should exist on the single successful node"
2171 );
2172 }
2173
2174 #[tokio::test(flavor = "multi_thread")]
2175 async fn unreconciled_blocks__when_quorum_latest_height_is_below_next_height_then_returns_empty()
2176 {
2177 let redis = RedisTestServer::spawn();
2179 let lease_key = "poa:test:cursor-fast-path".to_string();
2180 let stream_key = format!("{lease_key}:block:stream");
2181 let adapter = RedisLeaderLeaseAdapter::new(
2182 vec![redis.redis_url()],
2183 lease_key,
2184 Duration::from_secs(2),
2185 Duration::from_millis(100),
2186 Duration::from_millis(50),
2187 Duration::from_millis(0),
2188 1,
2189 1000,
2190 )
2191 .expect("adapter should be created");
2192 let block = poa_block_at_time(1, 10);
2193 let block_data = postcard::to_allocvec(&block).expect("serialize block");
2194 let redis_client =
2195 redis::Client::open(redis.redis_url()).expect("redis client should open");
2196 let mut conn = redis_client
2197 .get_connection()
2198 .expect("redis connection should open");
2199 append_stream_block(&mut conn, &stream_key, 1, &block_data, 1);
2200
2201 let blocks = adapter
2203 .unreconciled_blocks(2.into())
2204 .await
2205 .expect("reconciliation read should succeed");
2206
2207 assert!(
2209 blocks.is_empty(),
2210 "Expected fast path to skip full reconciliation"
2211 );
2212 }
2213
2214 #[tokio::test(flavor = "multi_thread")]
2215 async fn read_stream_entries_on_node__when_next_height_is_provided_then_reads_matching_entries()
2216 {
2217 let redis = RedisTestServer::spawn();
2219 let lease_key = "poa:test:cursor-incremental-read".to_string();
2220 let stream_key = format!("{lease_key}:block:stream");
2221 let adapter = RedisLeaderLeaseAdapter::new(
2222 vec![redis.redis_url()],
2223 lease_key,
2224 Duration::from_secs(2),
2225 Duration::from_millis(100),
2226 Duration::from_millis(50),
2227 Duration::from_millis(0),
2228 1,
2229 1000,
2230 )
2231 .expect("adapter should be created");
2232 let redis_client =
2233 redis::Client::open(redis.redis_url()).expect("redis client should open");
2234 let mut conn = redis_client
2235 .get_connection()
2236 .expect("redis connection should open");
2237 let h1 = poa_block_at_time(1, 10);
2238 let h2 = poa_block_at_time(2, 20);
2239 let h3 = poa_block_at_time(3, 30);
2240 let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2241 let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2242 let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2243 append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2244 append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2245 let redis_node = adapter.redis_nodes[0].clone();
2246
2247 let first_read = adapter
2249 .read_stream_entries_on_node(&redis_node, 1, 1000)
2250 .await
2251 .expect("first read should succeed");
2252 append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2253 let second_read = adapter
2254 .read_stream_entries_on_node(&redis_node, 3, 1000)
2255 .await
2256 .expect("second read should succeed");
2257
2258 assert_eq!(
2260 first_read.len(),
2261 2,
2262 "Expected initial read to include existing entries"
2263 );
2264 assert_eq!(
2265 second_read.len(),
2266 1,
2267 "Expected height-filtered read to include only matching entries"
2268 );
2269 assert_eq!(
2270 u32::from(*second_read[0].2.entity.header().height()),
2271 3,
2272 "Expected only the requested next height"
2273 );
2274 }
2275
2276 #[tokio::test(flavor = "multi_thread")]
2277 async fn read_stream_entries_on_node__when_max_entries_is_small_then_caps_results() {
2278 let redis = RedisTestServer::spawn();
2280 let lease_key = "poa:test:cursor-pagination".to_string();
2281 let stream_key = format!("{lease_key}:block:stream");
2282 let adapter = RedisLeaderLeaseAdapter::new(
2283 vec![redis.redis_url()],
2284 lease_key,
2285 Duration::from_secs(2),
2286 Duration::from_millis(100),
2287 Duration::from_millis(50),
2288 Duration::from_millis(0),
2289 1,
2290 1000,
2291 )
2292 .expect("adapter should be created");
2293 let redis_client =
2294 redis::Client::open(redis.redis_url()).expect("redis client should open");
2295 let mut conn = redis_client
2296 .get_connection()
2297 .expect("redis connection should open");
2298 let h1 = poa_block_at_time(1, 10);
2299 let h2 = poa_block_at_time(2, 20);
2300 let h3 = poa_block_at_time(3, 30);
2301 let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2302 let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2303 let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2304 append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2305 append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2306 append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2307 let redis_node = adapter.redis_nodes[0].clone();
2308
2309 let first_page = adapter
2311 .read_stream_entries_on_node(&redis_node, 1, 2)
2312 .await
2313 .expect("first page should succeed");
2314 let second_page = adapter
2315 .read_stream_entries_on_node(&redis_node, 3, 2)
2316 .await
2317 .expect("second page should succeed");
2318
2319 assert_eq!(first_page.len(), 2, "Expected first page to be capped");
2321 assert_eq!(
2322 u32::from(*first_page[0].2.entity.header().height()),
2323 1,
2324 "Expected first page to start from earliest height"
2325 );
2326 assert_eq!(
2327 u32::from(*first_page[1].2.entity.header().height()),
2328 2,
2329 "Expected first page to include second height"
2330 );
2331 assert_eq!(
2332 second_page.len(),
2333 1,
2334 "Expected height filter to return only matching trailing entry"
2335 );
2336 assert_eq!(
2337 u32::from(*second_page[0].2.entity.header().height()),
2338 3,
2339 "Expected second read to include only the requested next height"
2340 );
2341 }
2342
2343 #[tokio::test(flavor = "multi_thread")]
2350 async fn partial_publish_then_retry_at_same_height__new_leader_reconciles_stale_block()
2351 {
2352 let redis = RedisTestServer::spawn();
2353 let lease_key = "poa:test:fork-repro".to_string();
2354 let stream_key = format!("{lease_key}:block:stream");
2355
2356 let adapter_a = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2357 assert!(
2358 adapter_a
2359 .acquire_lease_if_free()
2360 .await
2361 .expect("acquire should succeed"),
2362 "adapter_a should acquire lease"
2363 );
2364 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2365 .expect("epoch should be set");
2366
2367 let block_a = poa_block_at_time(1, 100);
2369 let block_a_data = postcard::to_allocvec(&block_a).expect("serialize block_a");
2370 let redis_client =
2371 redis::Client::open(redis.redis_url()).expect("redis client should open");
2372 let mut conn = redis_client
2373 .get_connection()
2374 .expect("redis connection should open");
2375 append_stream_block(&mut conn, &stream_key, 1, &block_a_data, epoch_a as u32);
2376
2377 let block_b = poa_block_at_time(1, 999);
2380 assert_ne!(
2381 block_a.entity.header().time(),
2382 block_b.entity.header().time()
2383 );
2384
2385 let result = adapter_a.publish_produced_block(&block_b);
2386 assert!(
2387 result.is_err(),
2388 "publish at same height should fail due to HEIGHT_EXISTS"
2389 );
2390
2391 assert_eq!(stream_len(&redis.redis_url(), &stream_key), 1);
2393
2394 adapter_a.release().await.expect("release should succeed");
2395
2396 let adapter_b = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2398 assert!(
2399 adapter_b
2400 .acquire_lease_if_free()
2401 .await
2402 .expect("acquire should succeed"),
2403 "adapter_b should acquire lease"
2404 );
2405
2406 let unreconciled = adapter_b
2407 .unreconciled_blocks(1.into())
2408 .await
2409 .expect("reconciliation should succeed");
2410
2411 assert_eq!(unreconciled.len(), 1, "Should reconcile exactly one block");
2412 assert_eq!(
2413 unreconciled[0].entity.header().time(),
2414 block_a.entity.header().time(),
2415 "Reconciliation should return block_a (the only entry in the stream)"
2416 );
2417 }
2418
2419 struct RedisTestServer {
2420 child: Option<Child>,
2421 port: u16,
2422 redis_url: String,
2423 }
2424
2425 impl RedisTestServer {
2426 fn spawn() -> Self {
2427 let mut server = Self::new_stopped();
2428 server.start();
2429 server
2430 }
2431
2432 fn new_stopped() -> Self {
2433 let port = bind_unused_port();
2434 Self {
2435 child: None,
2436 port,
2437 redis_url: format!("redis://127.0.0.1:{port}/"),
2438 }
2439 }
2440
2441 fn start(&mut self) {
2442 if self.child.is_some() {
2443 return;
2444 }
2445 let child = spawn_redis_server(self.port);
2446 wait_for_redis_ready(self.port);
2447 self.child = Some(child);
2448 }
2449
2450 fn stop(&mut self) {
2451 if let Some(child) = self.child.as_mut() {
2452 let _ = child.kill();
2453 let _ = child.wait();
2454 }
2455 self.child = None;
2456 }
2457
2458 fn redis_url(&self) -> String {
2459 self.redis_url.clone()
2460 }
2461 }
2462
2463 impl Drop for RedisTestServer {
2464 fn drop(&mut self) {
2465 if let Some(child) = self.child.as_mut() {
2466 let _ = child.kill();
2467 let _ = child.wait();
2468 }
2469 }
2470 }
2471
2472 fn bind_unused_port() -> u16 {
2473 let socket =
2474 TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2475 .expect("Should bind an ephemeral port");
2476 let port = socket.local_addr().expect("Should get local addr").port();
2477 drop(socket);
2478 port
2479 }
2480
2481 fn spawn_redis_server(port: u16) -> Child {
2482 Command::new("redis-server")
2483 .arg("--port")
2484 .arg(port.to_string())
2485 .arg("--save")
2486 .arg("")
2487 .arg("--appendonly")
2488 .arg("no")
2489 .arg("--bind")
2490 .arg("127.0.0.1")
2491 .stdout(Stdio::null())
2492 .stderr(Stdio::null())
2493 .spawn()
2494 .expect("Failed to spawn redis-server")
2495 }
2496
2497 fn wait_for_redis_ready(port: u16) {
2498 let addr = SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, port);
2499 let start = std::time::Instant::now();
2500 let timeout = Duration::from_secs(5);
2501 while start.elapsed() < timeout {
2502 if TcpStream::connect(addr).is_ok() {
2503 return;
2504 }
2505 thread::sleep(Duration::from_millis(10));
2506 }
2507 panic!("Redis server did not become ready on port {port}");
2508 }
2509
2510 fn poa_block_at_time(height: u32, timestamp: u64) -> SealedBlock {
2511 let mut block = Block::default();
2512 block.header_mut().set_block_height(height.into());
2513 block
2514 .header_mut()
2515 .set_time(fuel_core_types::tai64::Tai64(timestamp));
2516 block.header_mut().recalculate_metadata();
2517 SealedBlock {
2518 entity: block,
2519 consensus: Consensus::PoA(Default::default()),
2520 }
2521 }
2522
2523 fn append_stream_block(
2524 conn: &mut redis::Connection,
2525 stream_key: &str,
2526 height: u32,
2527 data: &[u8],
2528 epoch: u32,
2529 ) {
2530 let _: String = redis::cmd("XADD")
2531 .arg(stream_key)
2532 .arg("*")
2533 .arg("height")
2534 .arg(height)
2535 .arg("data")
2536 .arg(data)
2537 .arg("epoch")
2538 .arg(epoch)
2539 .arg("timestamp")
2540 .arg(epoch)
2541 .query(conn)
2542 .expect("stream write should succeed");
2543 }
2544
2545 fn new_test_adapter(
2546 redis_urls: Vec<String>,
2547 lease_key: String,
2548 ) -> RedisLeaderLeaseAdapter {
2549 RedisLeaderLeaseAdapter::new(
2550 redis_urls,
2551 lease_key,
2552 Duration::from_secs(2),
2553 Duration::from_millis(100),
2554 Duration::from_millis(50),
2555 Duration::from_millis(0),
2556 1,
2557 1000,
2558 )
2559 .expect("adapter should be created")
2560 }
2561
2562 fn set_epoch(redis_url: &str, epoch_key: &str, epoch: u64) {
2563 let redis_client =
2564 redis::Client::open(redis_url).expect("redis client should open");
2565 let mut conn = redis_client
2566 .get_connection()
2567 .expect("redis connection should open");
2568 let _: () = redis::cmd("SET")
2569 .arg(epoch_key)
2570 .arg(epoch)
2571 .query(&mut conn)
2572 .expect("epoch set should succeed");
2573 }
2574
2575 fn read_epoch(redis_url: &str, epoch_key: &str) -> u64 {
2576 let redis_client =
2577 redis::Client::open(redis_url).expect("redis client should open");
2578 let mut conn = redis_client
2579 .get_connection()
2580 .expect("redis connection should open");
2581 let epoch: Option<u64> = redis::cmd("GET")
2582 .arg(epoch_key)
2583 .query(&mut conn)
2584 .expect("epoch get should succeed");
2585 epoch.expect("epoch should exist")
2586 }
2587
2588 fn set_lease_owner(
2589 redis_url: &str,
2590 lease_key: &str,
2591 owner: &str,
2592 lease_ttl_millis: u64,
2593 ) {
2594 let redis_client =
2595 redis::Client::open(redis_url).expect("redis client should open");
2596 let mut conn = redis_client
2597 .get_connection()
2598 .expect("redis connection should open");
2599 let _: () = redis::cmd("SET")
2600 .arg(lease_key)
2601 .arg(owner)
2602 .arg("PX")
2603 .arg(lease_ttl_millis)
2604 .query(&mut conn)
2605 .expect("lease owner set should succeed");
2606 }
2607
2608 fn read_lease_owner(redis_url: &str, lease_key: &str) -> Option<String> {
2609 let redis_client =
2610 redis::Client::open(redis_url).expect("redis client should open");
2611 let mut conn = redis_client
2612 .get_connection()
2613 .expect("redis connection should open");
2614 redis::cmd("GET")
2615 .arg(lease_key)
2616 .query(&mut conn)
2617 .expect("lease owner get should succeed")
2618 }
2619
2620 fn clear_lease_on_nodes(redis_urls: &[String], lease_key: &str) {
2621 redis_urls.iter().for_each(|redis_url| {
2622 let redis_client = redis::Client::open(redis_url.as_str())
2623 .expect("redis client should open");
2624 let mut conn = redis_client
2625 .get_connection()
2626 .expect("redis connection should open");
2627 let _: () = redis::cmd("DEL")
2628 .arg(lease_key)
2629 .query(&mut conn)
2630 .expect("lease delete should succeed");
2631 });
2632 }
2633
2634 fn stream_len(redis_url: &str, stream_key: &str) -> usize {
2635 let redis_client =
2636 redis::Client::open(redis_url).expect("redis client should open");
2637 let mut conn = redis_client
2638 .get_connection()
2639 .expect("redis connection should open");
2640 redis::cmd("XLEN")
2641 .arg(stream_key)
2642 .query(&mut conn)
2643 .expect("stream length query should succeed")
2644 }
2645
2646 #[tokio::test(flavor = "multi_thread")]
2651 async fn unreconciled_blocks__when_reads_fail_on_quorum_nodes__returns_error() {
2652 let mut redis_a = RedisTestServer::spawn();
2654 let mut redis_b = RedisTestServer::spawn();
2655 let redis_c = RedisTestServer::spawn();
2656 let lease_key = "poa:test:read-failure-fork".to_string();
2657 let redis_urls = vec![
2658 redis_a.redis_url(),
2659 redis_b.redis_url(),
2660 redis_c.redis_url(),
2661 ];
2662
2663 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2664 assert!(
2665 adapter_a
2666 .acquire_lease_if_free()
2667 .await
2668 .expect("acquire should succeed"),
2669 "Leader A should acquire lease"
2670 );
2671
2672 let block = poa_block_at_time(1, 100);
2673 adapter_a
2674 .publish_produced_block(&block)
2675 .expect("publish should succeed on all 3 nodes");
2676
2677 let stream_key = format!("{lease_key}:block:stream");
2679 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2680 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2681 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
2682
2683 adapter_a.release().await.expect("release should succeed");
2685
2686 redis_a.stop();
2688 redis_b.stop();
2689
2690 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2691 {
2693 let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
2694 *epoch = Some(99);
2695 }
2696
2697 let result = adapter_b.unreconciled_blocks(1.into()).await;
2698
2699 assert!(
2701 result.is_err(),
2702 "unreconciled_blocks must return error when reads fail on quorum of nodes"
2703 );
2704 }
2705
2706 #[tokio::test(flavor = "multi_thread")]
2710 async fn unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum()
2711 {
2712 let redis_a = RedisTestServer::spawn();
2715 let mut redis_b = RedisTestServer::spawn();
2716 let redis_c = RedisTestServer::spawn();
2717 let lease_key = "poa:test:data-loss-fork".to_string();
2718 let stream_key = format!("{lease_key}:block:stream");
2719 let redis_urls = vec![
2720 redis_a.redis_url(),
2721 redis_b.redis_url(),
2722 redis_c.redis_url(),
2723 ];
2724
2725 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2726 assert!(
2727 adapter_a
2728 .acquire_lease_if_free()
2729 .await
2730 .expect("acquire should succeed"),
2731 "Leader A should acquire lease"
2732 );
2733 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2734 .expect("epoch should be set");
2735
2736 let block = poa_block_at_time(1, 100);
2740 let block_data = postcard::to_allocvec(&block).expect("serialize");
2741
2742 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
2743 let mut conn_a = client_a.get_connection().expect("conn");
2744 let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
2745 let mut conn_b = client_b.get_connection().expect("conn");
2746
2747 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, epoch_a as u32);
2748 append_stream_block(&mut conn_b, &stream_key, 1, &block_data, epoch_a as u32);
2749 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2753 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2754 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 0);
2755
2756 let pre_loss = adapter_a
2758 .unreconciled_blocks(1.into())
2759 .await
2760 .expect("reconciliation should succeed");
2761 assert_eq!(
2762 pre_loss.len(),
2763 1,
2764 "Block should be reconcilable with 2/3 nodes having it"
2765 );
2766
2767 drop(conn_b);
2770 drop(client_b);
2771 redis_b.stop();
2772 redis_b.start();
2773
2774 assert_eq!(
2776 stream_len(&redis_b.redis_url(), &stream_key),
2777 0,
2778 "Restarted node should have empty stream"
2779 );
2780
2781 adapter_a.release().await.expect("release should succeed");
2783
2784 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2786 assert!(
2787 adapter_b
2788 .acquire_lease_if_free()
2789 .await
2790 .expect("acquire should succeed"),
2791 "New leader should acquire lease"
2792 );
2793
2794 let post_loss = adapter_b
2795 .unreconciled_blocks(1.into())
2796 .await
2797 .expect("reconciliation should succeed");
2798
2799 assert_eq!(
2802 post_loss.len(),
2803 1,
2804 "Repair should recover the block by reproposing from node A to the other nodes"
2805 );
2806 }
2807
2808 #[tokio::test(flavor = "multi_thread")]
2813 async fn has_lease_owner_quorum__expands_lock_to_non_owned_nodes() {
2814 let redis_a = RedisTestServer::spawn();
2816 let redis_b = RedisTestServer::spawn();
2817 let redis_c = RedisTestServer::spawn();
2818 let lease_key = "poa:test:lock-expansion".to_string();
2819 let stream_key = format!("{lease_key}:block:stream");
2820 let redis_urls = vec![
2821 redis_a.redis_url(),
2822 redis_b.redis_url(),
2823 redis_c.redis_url(),
2824 ];
2825
2826 let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2828 {
2830 let client = redis::Client::open(redis_c.redis_url()).expect("client");
2831 let mut conn = client.get_connection().expect("conn");
2832 let _: () = redis::cmd("SET")
2833 .arg(&lease_key)
2834 .arg(&candidate_b.lease_owner_token)
2835 .arg("PX")
2836 .arg(5000u64)
2837 .query(&mut conn)
2838 .expect("set should succeed");
2839 }
2840
2841 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2843 assert!(
2844 adapter_a
2845 .acquire_lease_if_free()
2846 .await
2847 .expect("acquire should succeed"),
2848 "Leader A should acquire quorum (2/3)"
2849 );
2850
2851 let owns_a = read_lease_owner(&redis_a.redis_url(), &lease_key)
2853 == Some(adapter_a.lease_owner_token.clone());
2854 let owns_b = read_lease_owner(&redis_b.redis_url(), &lease_key)
2855 == Some(adapter_a.lease_owner_token.clone());
2856 let owns_c = read_lease_owner(&redis_c.redis_url(), &lease_key)
2857 == Some(adapter_a.lease_owner_token.clone());
2858 assert!(owns_a && owns_b, "A should own nodes A and B");
2859 assert!(!owns_c, "A should NOT own node C (held by B)");
2860
2861 clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
2863 assert!(
2864 read_lease_owner(&redis_c.redis_url(), &lease_key).is_none(),
2865 "Node C should be free after B releases"
2866 );
2867
2868 let has_quorum = adapter_a
2870 .has_lease_owner_quorum()
2871 .await
2872 .expect("quorum check should succeed");
2873 assert!(has_quorum, "A should still have quorum");
2874
2875 let owns_c_after = read_lease_owner(&redis_c.redis_url(), &lease_key)
2877 == Some(adapter_a.lease_owner_token.clone());
2878 assert!(owns_c_after, "Lock expansion should have acquired node C");
2879
2880 let block = poa_block_at_time(1, 100);
2882 adapter_a
2883 .publish_produced_block(&block)
2884 .expect("publish should succeed");
2885
2886 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2887 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2888 assert_eq!(
2889 stream_len(&redis_c.redis_url(), &stream_key),
2890 1,
2891 "Block should be written to expanded node C"
2892 );
2893 }
2894
2895 #[tokio::test(flavor = "multi_thread")]
2899 async fn has_lease_owner_quorum__adopts_higher_epoch_from_expanded_node() {
2900 let redis_a = RedisTestServer::spawn();
2902 let redis_b = RedisTestServer::spawn();
2903 let redis_c = RedisTestServer::spawn();
2904 let lease_key = "poa:test:epoch-adoption".to_string();
2905 let epoch_key = format!("{lease_key}:epoch:token");
2906 let stream_key = format!("{lease_key}:block:stream");
2907 let redis_urls = vec![
2908 redis_a.redis_url(),
2909 redis_b.redis_url(),
2910 redis_c.redis_url(),
2911 ];
2912
2913 let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2916 {
2917 let client = redis::Client::open(redis_c.redis_url()).expect("client");
2918 let mut conn = client.get_connection().expect("conn");
2919 let _: () = redis::cmd("SET")
2921 .arg(&lease_key)
2922 .arg(&candidate_b.lease_owner_token)
2923 .arg("PX")
2924 .arg(5000u64)
2925 .query(&mut conn)
2926 .expect("set should succeed");
2927 let _: u64 = redis::cmd("INCR")
2928 .arg(&epoch_key)
2929 .query(&mut conn)
2930 .expect("incr should succeed");
2931 }
2933 clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
2934
2935 let epoch_c_before = read_epoch(&redis_c.redis_url(), &epoch_key);
2937
2938 let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2940 assert!(
2941 adapter_a
2942 .acquire_lease_if_free()
2943 .await
2944 .expect("acquire should succeed"),
2945 );
2946 let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2947 .expect("epoch should be set");
2948
2949 assert!(
2954 epoch_a > epoch_c_before,
2955 "Leader's epoch ({epoch_a}) should be > node C's pre-acquisition epoch ({epoch_c_before})"
2956 );
2957
2958 let block = poa_block_at_time(1, 100);
2960 adapter_a
2961 .publish_produced_block(&block)
2962 .expect("publish should succeed on all 3 nodes");
2963
2964 assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2965 assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2966 assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
2967 }
2968
2969 #[tokio::test(flavor = "multi_thread")]
2973 async fn metrics__poa_metrics_appear_in_encoded_output_after_exercising_all_paths() {
2974 let redis_a = RedisTestServer::spawn();
2976 let redis_b = RedisTestServer::spawn();
2977 let redis_c = RedisTestServer::spawn();
2978 let lease_key = "poa:test:metrics-smoke".to_string();
2979 let stream_key = format!("{lease_key}:block:stream");
2980 let redis_urls = vec![
2981 redis_a.redis_url(),
2982 redis_b.redis_url(),
2983 redis_c.redis_url(),
2984 ];
2985
2986 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
2988 assert!(
2989 adapter
2990 .acquire_lease_if_free()
2991 .await
2992 .expect("acquire should succeed"),
2993 "adapter should acquire lease"
2994 );
2995
2996 let block1 = poa_block_at_time(1, 100);
2998 adapter
2999 .publish_produced_block(&block1)
3000 .expect("publish should succeed");
3001
3002 let block1_dup = poa_block_at_time(1, 200);
3004 let dup_result = adapter.publish_produced_block(&block1_dup);
3005 assert!(dup_result.is_err(), "duplicate height should fail");
3006
3007 let old_adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3009 {
3011 let mut epoch = old_adapter.current_epoch_token.lock().expect("lock");
3012 *epoch = Some(1);
3013 }
3014 let _zombie = old_adapter.publish_produced_block(&poa_block_at_time(2, 300));
3015
3016 let orphan = poa_block_at_time(2, 400);
3019 let orphan_data = postcard::to_allocvec(&orphan).expect("serialize orphan");
3020 let client_a = redis::Client::open(redis_a.redis_url()).expect("redis client");
3021 let mut conn_a = client_a.get_connection().expect("redis connection");
3022 let epoch_val =
3023 (*adapter.current_epoch_token.lock().expect("lock")).expect("epoch set");
3024 #[allow(clippy::cast_possible_truncation)]
3025 let epoch_u32 = epoch_val as u32;
3026 append_stream_block(&mut conn_a, &stream_key, 2, &orphan_data, epoch_u32);
3027
3028 let state = adapter
3030 .leader_state(2.into())
3031 .await
3032 .expect("leader_state should succeed");
3033 assert!(
3034 matches!(
3035 state,
3036 LeaderState::UnreconciledBlocks(ref blocks) if !blocks.is_empty()
3037 ),
3038 "Should have unreconciled blocks: {state:?}"
3039 );
3040
3041 let encoded =
3043 fuel_core_metrics::encode_metrics().expect("encode_metrics should succeed");
3044
3045 let poa_lines: Vec<&str> =
3047 encoded.lines().filter(|l| l.contains("poa_")).collect();
3048 for line in &poa_lines {
3049 eprintln!("{line}");
3050 }
3051
3052 let expected_names = [
3055 "poa_leader_epoch",
3056 "poa_is_leader",
3057 "poa_epoch_max_drift",
3058 "poa_stream_trim_headroom",
3059 "poa_write_block_success_total",
3060 "poa_write_block_height_exists_total",
3061 "poa_write_block_fencing_error_total",
3062 "poa_write_block_error_total",
3063 "poa_repair_success_total",
3064 "poa_promotion_success_total",
3065 "poa_promotion_duration_s",
3066 "poa_write_block_duration_s",
3067 "poa_reconciliation_duration_s",
3068 "poa_connection_reset_total",
3069 ];
3070 for name in &expected_names {
3071 assert!(
3072 encoded.contains(name),
3073 "Metric '{name}' missing from /v1/metrics output"
3074 );
3075 }
3076
3077 let non_zero_metrics = [
3083 "poa_leader_epoch",
3084 "poa_is_leader",
3085 "poa_write_block_success_total",
3086 "poa_promotion_success_total",
3087 "poa_repair_success_total",
3088 ];
3089 for name in &non_zero_metrics {
3090 let metric_line = encoded
3091 .lines()
3092 .find(|l| {
3093 l.starts_with(name)
3094 && !l.starts_with(&format!("{name}_"))
3095 && !l.starts_with('#')
3096 })
3097 .unwrap_or_else(|| panic!("No data line for {name}"));
3098 assert!(
3099 !metric_line.ends_with(" 0"),
3100 "Metric '{name}' should be non-zero, got: {metric_line}"
3101 );
3102 }
3103 }
3104
3105 #[tokio::test(flavor = "multi_thread")]
3108 async fn unreconciled_blocks__after_quorum_read_failure_then_backlog_remains_readable()
3109 {
3110 let mut redis_a = RedisTestServer::spawn();
3112 let mut redis_b = RedisTestServer::spawn();
3113 let redis_c = RedisTestServer::spawn();
3114 let lease_key = "poa:test:cursor-restore-quorum".to_string();
3115 let stream_key = format!("{lease_key}:block:stream");
3116 let redis_urls = vec![
3117 redis_a.redis_url(),
3118 redis_b.redis_url(),
3119 redis_c.redis_url(),
3120 ];
3121
3122 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3123 assert!(
3124 adapter
3125 .acquire_lease_if_free()
3126 .await
3127 .expect("acquire should succeed"),
3128 "Should acquire lease"
3129 );
3130
3131 let block = poa_block_at_time(1, 100);
3132 adapter
3133 .publish_produced_block(&block)
3134 .expect("publish should succeed on all 3 nodes");
3135 adapter.release().await.expect("release should succeed");
3136
3137 redis_a.stop();
3139 redis_b.stop();
3140
3141 let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3142 {
3143 let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3144 *epoch = Some(99);
3145 }
3146 let result = adapter_b.unreconciled_blocks(1.into()).await;
3147 assert!(result.is_err(), "Should fail with quorum read failure");
3148
3149 redis_a.start();
3151 redis_b.start();
3152
3153 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3155 let mut conn_a = client_a.get_connection().expect("conn");
3156 let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3157 let mut conn_b = client_b.get_connection().expect("conn");
3158 let block_data = postcard::to_allocvec(&block).expect("serialize");
3159 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3160 append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 1);
3161
3162 let blocks = adapter_b
3164 .unreconciled_blocks(1.into())
3165 .await
3166 .expect("reconciliation should succeed now");
3167 assert_eq!(
3168 blocks.len(),
3169 1,
3170 "Quorum read failure should not make backlog entries unreadable"
3171 );
3172 }
3173
3174 #[tokio::test(flavor = "multi_thread")]
3177 async fn unreconciled_blocks__after_repair_failure_then_backlog_remains_readable() {
3178 let redis_a = RedisTestServer::spawn();
3180 let redis_b = RedisTestServer::spawn();
3181 let redis_c = RedisTestServer::spawn();
3182 let lease_key = "poa:test:cursor-restore-repair".to_string();
3183 let stream_key = format!("{lease_key}:block:stream");
3184 let redis_urls = vec![
3185 redis_a.redis_url(),
3186 redis_b.redis_url(),
3187 redis_c.redis_url(),
3188 ];
3189
3190 let block = poa_block_at_time(1, 100);
3191 let block_data = postcard::to_allocvec(&block).expect("serialize");
3192
3193 let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3195 let mut conn_a = client_a.get_connection().expect("conn");
3196 append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3197
3198 let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3200 {
3203 let mut epoch = adapter.current_epoch_token.lock().expect("lock");
3204 *epoch = Some(99);
3205 }
3206
3207 let result = adapter.unreconciled_blocks(1.into()).await;
3209 assert!(
3211 result.is_err(),
3212 "Should return error when repair fails and backlog remains unresolved"
3213 );
3214
3215 assert!(
3217 adapter
3218 .acquire_lease_if_free()
3219 .await
3220 .expect("acquire should succeed"),
3221 "Should acquire lease"
3222 );
3223
3224 let blocks = adapter
3226 .unreconciled_blocks(1.into())
3227 .await
3228 .expect("reconciliation should succeed with lock held");
3229 assert_eq!(
3230 blocks.len(),
3231 1,
3232 "Repair failure should not make backlog unreadable on the next round"
3233 );
3234 }
3235}