Skip to main content

fuel_core/service/adapters/consensus_module/
poa.rs

1use crate::{
2    fuel_core_graphql_api::ports::ConsensusModulePort,
3    service::adapters::{
4        BlockImporterAdapter,
5        BlockProducerAdapter,
6        P2PAdapter,
7        PoAAdapter,
8        TxPoolAdapter,
9    },
10};
11use anyhow::anyhow;
12use fuel_core_importer::ports::{
13    BlockReconciliationWritePort,
14    ImporterDatabase,
15};
16use fuel_core_metrics::poa_metrics::poa_metrics;
17use fuel_core_poa::{
18    ports::{
19        BlockImporter,
20        BlockReconciliationReadPort,
21        LeaderState,
22        P2pPort,
23        PredefinedBlocks,
24        TransactionPool,
25        TransactionsSource,
26    },
27    service::{
28        Mode,
29        SharedState,
30    },
31};
32use fuel_core_services::stream::BoxStream;
33use fuel_core_storage::transactional::Changes;
34use fuel_core_types::{
35    blockchain::{
36        SealedBlock,
37        block::Block,
38        primitives::BlockId,
39    },
40    fuel_types::BlockHeight,
41    services::{
42        block_importer::{
43            BlockImportInfo,
44            UncommittedResult as UncommittedImporterResult,
45        },
46        executor::UncommittedResult,
47    },
48    tai64::Tai64,
49};
50use std::{
51    collections::HashMap,
52    path::{
53        Path,
54        PathBuf,
55    },
56    time::Duration,
57};
58use tokio::{
59    sync::{
60        Mutex,
61        watch,
62    },
63    time::{
64        Instant,
65        sleep,
66        timeout,
67    },
68};
69use tokio_stream::{
70    StreamExt,
71    wrappers::BroadcastStream,
72};
73use tracing::error;
74
75pub mod pre_confirmation_signature;
76
77const CHECK_LEASE_OWNER_SCRIPT: &str = include_str!(concat!(
78    env!("CARGO_MANIFEST_DIR"),
79    "/redis_leader_lease_adapter_scripts/check_lease_owner.lua"
80));
81
82const RELEASE_LOCK_SCRIPT: &str = include_str!(concat!(
83    env!("CARGO_MANIFEST_DIR"),
84    "/redis_leader_lease_adapter_scripts/release_lock.lua"
85));
86
87const PROMOTE_LEADER_SCRIPT: &str = include_str!(concat!(
88    env!("CARGO_MANIFEST_DIR"),
89    "/redis_leader_lease_adapter_scripts/promote_leader.lua"
90));
91
92const WRITE_BLOCK_SCRIPT: &str = include_str!(concat!(
93    env!("CARGO_MANIFEST_DIR"),
94    "/redis_leader_lease_adapter_scripts/write_block.lua"
95));
96
97const READ_STREAM_ENTRIES_SCRIPT: &str = include_str!(concat!(
98    env!("CARGO_MANIFEST_DIR"),
99    "/redis_leader_lease_adapter_scripts/read_stream_entries.lua"
100));
101
102const READ_LATEST_STREAM_ENTRY_SCRIPT: &str = include_str!(concat!(
103    env!("CARGO_MANIFEST_DIR"),
104    "/redis_leader_lease_adapter_scripts/read_latest_stream_entry.lua"
105));
106
107struct RedisNode {
108    redis_client: redis::Client,
109    cached_connection: Mutex<Option<redis::aio::MultiplexedConnection>>,
110}
111
112impl Clone for RedisNode {
113    fn clone(&self) -> Self {
114        Self {
115            redis_client: self.redis_client.clone(),
116            cached_connection: Mutex::new(None),
117        }
118    }
119}
120
121pub struct RedisLeaderLeaseAdapter {
122    redis_nodes: Vec<RedisNode>,
123    quorum: usize,
124    quorum_disruption_budget: u32,
125    lease_key: String,
126    epoch_key: String,
127    block_stream_key: String,
128    lease_owner_token: String,
129    drop_release_guard: std::sync::Arc<()>,
130    current_epoch_token: std::sync::Arc<std::sync::Mutex<Option<u64>>>,
131    lease_ttl_millis: u64,
132    lease_drift_millis: u64,
133    node_timeout: Duration,
134    retry_delay_millis: u64,
135    max_retry_delay_offset_millis: u64,
136    max_attempts: usize,
137    stream_max_len: u32,
138}
139
140impl Clone for RedisLeaderLeaseAdapter {
141    fn clone(&self) -> Self {
142        Self {
143            redis_nodes: self.redis_nodes.clone(),
144            quorum: self.quorum,
145            quorum_disruption_budget: self.quorum_disruption_budget,
146            lease_key: self.lease_key.clone(),
147            epoch_key: self.epoch_key.clone(),
148            block_stream_key: self.block_stream_key.clone(),
149            lease_owner_token: self.lease_owner_token.clone(),
150            drop_release_guard: self.drop_release_guard.clone(),
151            current_epoch_token: self.current_epoch_token.clone(),
152            lease_ttl_millis: self.lease_ttl_millis,
153            lease_drift_millis: self.lease_drift_millis,
154            node_timeout: self.node_timeout,
155            retry_delay_millis: self.retry_delay_millis,
156            max_retry_delay_offset_millis: self.max_retry_delay_offset_millis,
157            max_attempts: self.max_attempts,
158            stream_max_len: self.stream_max_len,
159        }
160    }
161}
162
163#[derive(Default, Clone)]
164pub struct NoopReconciliationAdapter;
165
166#[allow(clippy::large_enum_variant)]
167pub enum ReconciliationAdapter {
168    Redis(RedisLeaderLeaseAdapter),
169    Noop(NoopReconciliationAdapter),
170}
171
172impl RedisLeaderLeaseAdapter {
173    fn calculate_quorum(redis_nodes_len: usize, quorum_disruption_budget: u32) -> usize {
174        let majority = redis_nodes_len
175            .checked_div(2)
176            .unwrap_or(0)
177            .saturating_add(1);
178        let disruption_budget = usize::try_from(quorum_disruption_budget).unwrap_or(0);
179        majority
180            .saturating_add(disruption_budget)
181            .min(redis_nodes_len)
182    }
183
184    #[allow(clippy::too_many_arguments)]
185    pub fn new(
186        redis_urls: Vec<String>,
187        lease_key: String,
188        lease_ttl: Duration,
189        node_timeout: Duration,
190        retry_delay: Duration,
191        max_retry_delay_offset: Duration,
192        max_attempts: u32,
193        stream_max_len: u32,
194    ) -> anyhow::Result<Self> {
195        let redis_nodes = redis_urls
196            .into_iter()
197            .map(|redis_url| {
198                redis::Client::open(redis_url).map(|redis_client| RedisNode {
199                    redis_client,
200                    cached_connection: Mutex::new(None),
201                })
202            })
203            .collect::<Result<Vec<_>, _>>()?;
204        if redis_nodes.is_empty() {
205            return Err(anyhow!(
206                "At least one redis url is required for leader lock"
207            ));
208        }
209        let quorum_disruption_budget = 0u32;
210        let quorum = Self::calculate_quorum(redis_nodes.len(), quorum_disruption_budget);
211        let lease_ttl_millis = u64::try_from(lease_ttl.as_millis())?;
212        let retry_delay_millis = u64::try_from(retry_delay.as_millis())?;
213        let max_retry_delay_offset_millis =
214            u64::try_from(max_retry_delay_offset.as_millis())?;
215        let max_attempts = usize::try_from(max_attempts)?.max(1);
216        let lease_owner_token = uuid::Uuid::new_v4().to_string();
217        let epoch_key = format!("{lease_key}:epoch:token");
218        let block_stream_key = format!("{lease_key}:block:stream");
219        let lease_drift_millis = lease_ttl_millis
220            .checked_div(100)
221            .unwrap_or(0)
222            .saturating_add(2);
223        Ok(Self {
224            redis_nodes,
225            quorum,
226            quorum_disruption_budget,
227            lease_key,
228            epoch_key,
229            block_stream_key,
230            lease_owner_token,
231            drop_release_guard: std::sync::Arc::new(()),
232            current_epoch_token: std::sync::Arc::new(std::sync::Mutex::new(None)),
233            lease_ttl_millis,
234            lease_drift_millis,
235            node_timeout,
236            retry_delay_millis,
237            max_retry_delay_offset_millis,
238            max_attempts,
239            stream_max_len,
240        })
241    }
242
243    pub fn with_quorum_disruption_budget(
244        mut self,
245        quorum_disruption_budget: u32,
246    ) -> Self {
247        self.quorum_disruption_budget = quorum_disruption_budget;
248        self.quorum =
249            Self::calculate_quorum(self.redis_nodes.len(), quorum_disruption_budget);
250        self
251    }
252
253    async fn multiplexed_connection(
254        &self,
255        redis_node: &RedisNode,
256    ) -> anyhow::Result<redis::aio::MultiplexedConnection> {
257        if let Some(connection) =
258            redis_node.cached_connection.lock().await.as_ref().cloned()
259        {
260            return Ok(connection);
261        }
262
263        let new_connection = timeout(
264            self.node_timeout,
265            redis_node.redis_client.get_multiplexed_async_connection(),
266        )
267        .await
268        .map_err(|_| anyhow!("Timed out while connecting to redis leader-lock node"))??;
269        let mut cached_connection = redis_node.cached_connection.lock().await;
270        if let Some(connection) = cached_connection.as_ref().cloned() {
271            return Ok(connection);
272        }
273        *cached_connection = Some(new_connection.clone());
274        Ok(new_connection)
275    }
276
277    async fn clear_cached_connection(&self, redis_node: &RedisNode) {
278        let mut cached_connection = redis_node.cached_connection.lock().await;
279        *cached_connection = None;
280        poa_metrics().connection_reset_total.inc();
281    }
282
283    async fn check_lease_owner_on_node(&self, redis_node: &RedisNode) -> bool {
284        let mut connection = match self.multiplexed_connection(redis_node).await {
285            Ok(connection) => connection,
286            Err(_) => return false,
287        };
288        let is_owner = timeout(
289            self.node_timeout,
290            redis::Script::new(CHECK_LEASE_OWNER_SCRIPT)
291                .key(&self.lease_key)
292                .arg(&self.lease_owner_token)
293                .invoke_async::<i32>(&mut connection),
294        )
295        .await;
296        match is_owner {
297            Ok(Ok(is_owner)) => is_owner == 1,
298            Err(_) => {
299                self.clear_cached_connection(redis_node).await;
300                false
301            }
302            Ok(Err(_)) => {
303                self.clear_cached_connection(redis_node).await;
304                false
305            }
306        }
307    }
308
309    async fn promote_leader_on_node(
310        &self,
311        redis_node: &RedisNode,
312    ) -> anyhow::Result<Option<u64>> {
313        let mut connection = match self.multiplexed_connection(redis_node).await {
314            Ok(connection) => connection,
315            Err(_) => return Ok(None),
316        };
317        let promoted = timeout(
318            self.node_timeout,
319            redis::Script::new(PROMOTE_LEADER_SCRIPT)
320                .key(&self.lease_key)
321                .key(&self.epoch_key)
322                .arg(&self.lease_owner_token)
323                .arg(self.lease_ttl_millis)
324                .invoke_async::<u64>(&mut connection),
325        )
326        .await;
327        match promoted {
328            Ok(Ok(token)) => Ok(Some(token)),
329            Ok(Err(err)) => {
330                if err.to_string().contains("LOCK_HELD:") {
331                    return Ok(None);
332                }
333                self.clear_cached_connection(redis_node).await;
334                Ok(None)
335            }
336            Err(_) => {
337                self.clear_cached_connection(redis_node).await;
338                Ok(None)
339            }
340        }
341    }
342
343    async fn release_lease_on_node(&self, redis_node: &RedisNode) -> bool {
344        let mut connection = match self.multiplexed_connection(redis_node).await {
345            Ok(connection) => connection,
346            Err(_) => return false,
347        };
348        let released = timeout(
349            self.node_timeout,
350            redis::Script::new(RELEASE_LOCK_SCRIPT)
351                .key(&self.lease_key)
352                .arg(&self.lease_owner_token)
353                .invoke_async::<i32>(&mut connection),
354        )
355        .await;
356        match released {
357            Ok(Ok(released)) => released == 1,
358            Err(_) => {
359                self.clear_cached_connection(redis_node).await;
360                false
361            }
362            Ok(Err(_)) => {
363                self.clear_cached_connection(redis_node).await;
364                false
365            }
366        }
367    }
368
369    fn quorum_reached(&self, success_count: usize) -> bool {
370        success_count >= self.quorum
371    }
372
373    fn calculate_remaining_validity_millis(&self, elapsed_millis: u64) -> u64 {
374        self.lease_ttl_millis
375            .saturating_sub(elapsed_millis.saturating_add(self.lease_drift_millis))
376    }
377
378    fn random_retry_delay_offset_millis(&self) -> u64 {
379        if self.max_retry_delay_offset_millis == 0 {
380            return 0;
381        }
382        rand::random::<u64>()
383            .checked_rem(self.max_retry_delay_offset_millis.saturating_add(1))
384            .unwrap_or(0)
385    }
386
387    async fn release_lease_on_all_nodes(&self) {
388        let _ = futures::future::join_all(
389            self.redis_nodes
390                .iter()
391                .map(|redis_node| self.release_lease_on_node(redis_node)),
392        )
393        .await;
394    }
395
396    async fn delay_next_retry(&self) {
397        let retry_delay_millis = self
398            .retry_delay_millis
399            .saturating_add(self.random_retry_delay_offset_millis());
400        sleep(Duration::from_millis(retry_delay_millis)).await;
401    }
402
403    async fn has_lease_owner_quorum(&self) -> anyhow::Result<bool> {
404        let ownership = futures::future::join_all(
405            self.redis_nodes
406                .iter()
407                .map(|redis_node| self.check_lease_owner_on_node(redis_node)),
408        )
409        .await;
410        let owner_count = ownership.iter().filter(|&&is_owner| is_owner).count();
411        if !self.quorum_reached(owner_count) {
412            tracing::info!(
413                owner_count,
414                quorum = self.quorum,
415                redis_nodes = self.redis_nodes.len(),
416                current_epoch_token = ?self.current_epoch_token_value(),
417                lease_key = %self.lease_key,
418                "Leader lock quorum not held by this authority"
419            );
420            return Ok(false);
421        }
422
423        // Best-effort: acquire the lock on nodes we don't own yet.
424        // Expands write coverage beyond minimum quorum so block data
425        // is replicated to more nodes, improving fault tolerance.
426        // If a newly-acquired node returns a higher epoch (from
427        // election storm drift), adopt it so write_block.lua uses
428        // a consistent epoch across all owned nodes.
429        let non_owned: Vec<&RedisNode> = self
430            .redis_nodes
431            .iter()
432            .zip(ownership.iter())
433            .filter(|(_, is_owner)| !**is_owner)
434            .map(|(node, _)| node)
435            .collect();
436
437        if !non_owned.is_empty() {
438            let results = futures::future::join_all(
439                non_owned
440                    .into_iter()
441                    .map(|redis_node| self.promote_leader_on_node(redis_node)),
442            )
443            .await;
444
445            if let Some(max_new) =
446                results.into_iter().filter_map(|r| r.ok().flatten()).max()
447                && let Ok(mut epoch) = self.current_epoch_token.lock()
448            {
449                let current = epoch.unwrap_or(0);
450                if max_new > current {
451                    tracing::debug!(
452                        old_epoch = current,
453                        new_epoch = max_new,
454                        "Adopted higher epoch from lock expansion"
455                    );
456                    *epoch = Some(max_new);
457                }
458            }
459        }
460
461        Ok(true)
462    }
463
464    async fn acquire_lease_if_free(&self) -> anyhow::Result<bool> {
465        let promotion_start = std::time::Instant::now();
466        for attempt_index in 0..self.max_attempts {
467            let start = std::time::Instant::now();
468            let promoted_nodes = futures::future::join_all(
469                self.redis_nodes
470                    .iter()
471                    .map(|redis_node| self.promote_leader_on_node(redis_node)),
472            )
473            .await;
474            let promoted_tokens = promoted_nodes
475                .into_iter()
476                .filter_map(|token| token.ok().flatten())
477                .collect::<Vec<_>>();
478            let acquired_count = promoted_tokens.len();
479            let elapsed_millis =
480                u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
481            let validity_millis =
482                self.calculate_remaining_validity_millis(elapsed_millis);
483            tracing::debug!(
484                attempt = attempt_index.saturating_add(1),
485                max_attempts = self.max_attempts,
486                acquired_count,
487                quorum = self.quorum,
488                redis_nodes = self.redis_nodes.len(),
489                elapsed_millis,
490                validity_millis,
491                promoted_tokens = ?promoted_tokens,
492                current_epoch_token = ?self.current_epoch_token_value(),
493                lease_key = %self.lease_key,
494                "Leader lock promotion attempt finished"
495            );
496            if self.quorum_reached(acquired_count) && validity_millis > 0 {
497                // Record epoch drift across quorum nodes
498                if promoted_tokens.len() > 1
499                    && let (Some(min_tok), Some(max_tok)) = (
500                        promoted_tokens.iter().copied().min(),
501                        promoted_tokens.iter().copied().max(),
502                    )
503                {
504                    poa_metrics().epoch_max_drift.set(
505                        i64::try_from(max_tok.saturating_sub(min_tok))
506                            .unwrap_or(i64::MAX),
507                    );
508                }
509                if let Some(max_token) = promoted_tokens.into_iter().max() {
510                    let mut current_epoch_token = self
511                        .current_epoch_token
512                        .lock()
513                        .map_err(|e| anyhow!("epoch token lock poisoned: {}", e))?;
514                    *current_epoch_token = Some(max_token);
515                    poa_metrics()
516                        .leader_epoch
517                        .set(i64::try_from(max_token).unwrap_or(i64::MAX));
518                }
519                poa_metrics().promotion_success_total.inc();
520                poa_metrics()
521                    .promotion_duration_s
522                    .observe(promotion_start.elapsed().as_secs_f64());
523                return Ok(true);
524            }
525            self.release_lease_on_all_nodes().await;
526            let is_last_attempt = attempt_index.saturating_add(1) == self.max_attempts;
527            if !is_last_attempt {
528                self.delay_next_retry().await;
529            }
530        }
531        tracing::warn!(
532            max_attempts = self.max_attempts,
533            quorum = self.quorum,
534            redis_nodes = self.redis_nodes.len(),
535            current_epoch_token = ?self.current_epoch_token_value(),
536            lease_key = %self.lease_key,
537            "Leader lock promotion failed; cannot produce block"
538        );
539        poa_metrics().promotion_failure_total.inc();
540        poa_metrics()
541            .promotion_duration_s
542            .observe(promotion_start.elapsed().as_secs_f64());
543        Ok(false)
544    }
545
546    async fn release_lease_on_client(
547        redis_client: redis::Client,
548        lease_key: String,
549        lease_owner_token: String,
550        node_timeout: Duration,
551    ) {
552        let connection = timeout(
553            node_timeout,
554            redis_client.get_multiplexed_async_connection(),
555        )
556        .await;
557        let mut connection = match connection {
558            Ok(Ok(connection)) => connection,
559            Err(_) => return,
560            Ok(Err(_)) => return,
561        };
562        let _ = timeout(
563            node_timeout,
564            redis::Script::new(RELEASE_LOCK_SCRIPT)
565                .key(lease_key)
566                .arg(lease_owner_token)
567                .invoke_async::<i32>(&mut connection),
568        )
569        .await;
570    }
571
572    async fn release_lease_on_clients(
573        redis_clients: Vec<redis::Client>,
574        lease_key: String,
575        lease_owner_token: String,
576        node_timeout: Duration,
577    ) {
578        let _ =
579            futures::future::join_all(redis_clients.into_iter().map(|redis_client| {
580                Self::release_lease_on_client(
581                    redis_client,
582                    lease_key.clone(),
583                    lease_owner_token.clone(),
584                    node_timeout,
585                )
586            }))
587            .await;
588    }
589
590    fn release_lease_on_clients_sync(
591        redis_clients: Vec<redis::Client>,
592        lease_key: String,
593        lease_owner_token: String,
594    ) {
595        redis_clients.into_iter().for_each(|redis_client| {
596            let Ok(mut connection) = redis_client.get_connection() else {
597                return;
598            };
599            let _ = redis::Script::new(RELEASE_LOCK_SCRIPT)
600                .key(&lease_key)
601                .arg(&lease_owner_token)
602                .invoke::<i32>(&mut connection);
603        });
604    }
605
606    async fn read_latest_stream_entry_on_node(
607        &self,
608        redis_node: &RedisNode,
609    ) -> anyhow::Result<Option<(u32, String)>> {
610        let mut connection = self.multiplexed_connection(redis_node).await?;
611        let latest_entry = timeout(
612            self.node_timeout,
613            redis::Script::new(READ_LATEST_STREAM_ENTRY_SCRIPT)
614                .key(&self.block_stream_key)
615                .invoke_async::<Vec<String>>(&mut connection),
616        )
617        .await;
618        match latest_entry {
619            Err(_) => {
620                self.clear_cached_connection(redis_node).await;
621                Err(anyhow!(
622                    "Timed out reading latest stream entry from Redis node"
623                ))
624            }
625            Ok(Err(e)) => {
626                self.clear_cached_connection(redis_node).await;
627                Err(anyhow!(
628                    "Failed to read latest stream entry from Redis node: {e}"
629                ))
630            }
631            Ok(Ok(entry)) => {
632                if entry.len() != 2 {
633                    return Ok(None);
634                }
635                let height = entry[0]
636                    .parse::<u32>()
637                    .map_err(|e| anyhow!("Invalid latest stream entry height: {e}"))?;
638                Ok(Some((height, entry[1].clone())))
639            }
640        }
641    }
642
643    async fn should_reconcile_from_stream(
644        &self,
645        next_height: BlockHeight,
646    ) -> anyhow::Result<bool> {
647        let next_height = u32::from(next_height);
648        let latest_results = futures::future::join_all(
649            self.redis_nodes
650                .iter()
651                .map(|redis_node| self.read_latest_stream_entry_on_node(redis_node)),
652        )
653        .await;
654        let mut successful_reads = 0usize;
655        let mut failed_count = 0usize;
656        let mut nodes_indicating_backlog = 0usize;
657        for result in latest_results {
658            match result {
659                Ok(Some((latest_height, _latest_stream_id))) => {
660                    successful_reads = successful_reads.saturating_add(1);
661                    if latest_height >= next_height {
662                        nodes_indicating_backlog =
663                            nodes_indicating_backlog.saturating_add(1);
664                    }
665                }
666                Ok(None) => {
667                    successful_reads = successful_reads.saturating_add(1);
668                }
669                Err(e) => {
670                    tracing::warn!("Redis latest stream read failed: {e}");
671                    failed_count = failed_count.saturating_add(1);
672                }
673            }
674        }
675        if !self.quorum_reached(successful_reads) {
676            return Err(anyhow!(
677                "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
678                successful_reads,
679                self.redis_nodes.len(),
680                failed_count
681            ));
682        }
683        Ok(nodes_indicating_backlog > 0)
684    }
685
686    async fn read_stream_entries_on_node(
687        &self,
688        redis_node: &RedisNode,
689        next_height: u32,
690        max_entries: usize,
691    ) -> anyhow::Result<Vec<(u32, u64, SealedBlock)>> {
692        if max_entries == 0 {
693            return Ok(Vec::new());
694        }
695
696        let mut connection = self.multiplexed_connection(redis_node).await?;
697        let count = u32::try_from(max_entries).unwrap_or(u32::MAX);
698        let stream_entries = timeout(
699            self.node_timeout,
700            redis::Script::new(READ_STREAM_ENTRIES_SCRIPT)
701                .key(&self.block_stream_key)
702                .arg(next_height)
703                .arg(count)
704                .invoke_async::<Vec<(u32, u64, Vec<u8>, String)>>(&mut connection),
705        )
706        .await;
707
708        let entries = match stream_entries {
709            Err(_) => {
710                self.clear_cached_connection(redis_node).await;
711                return Err(anyhow!("Timed out reading stream entries from Redis node"));
712            }
713            Ok(Err(e)) => {
714                self.clear_cached_connection(redis_node).await;
715                return Err(anyhow!(
716                    "Failed to read stream entries from Redis node: {e}"
717                ));
718            }
719            Ok(Ok(entries)) => entries,
720        };
721
722        let mut blocks = Vec::new();
723        for (height, epoch, bytes, _stream_id) in entries {
724            match postcard::from_bytes::<SealedBlock>(&bytes) {
725                Ok(block) => blocks.push((height, epoch, block)),
726                Err(e) => {
727                    tracing::warn!(
728                        "Skipping stream entry: failed to deserialize block at height {height}: {e}"
729                    );
730                }
731            }
732        }
733
734        Ok(blocks)
735    }
736
737    async fn unreconciled_blocks(
738        &self,
739        next_height: BlockHeight,
740    ) -> anyhow::Result<Vec<SealedBlock>> {
741        if !self.should_reconcile_from_stream(next_height).await? {
742            return Ok(Vec::new());
743        }
744        let mut reconciled = Vec::new();
745        let max_reconcile_blocks_per_round =
746            usize::try_from(self.stream_max_len).unwrap_or(usize::MAX);
747        let next_height_u32 = u32::from(next_height);
748        let read_results =
749            futures::future::join_all(self.redis_nodes.iter().map(|redis_node| {
750                self.read_stream_entries_on_node(
751                    redis_node,
752                    next_height_u32,
753                    max_reconcile_blocks_per_round,
754                )
755            }))
756            .await;
757
758        let mut successful_reads = Vec::new();
759        let mut failed_count = 0usize;
760        for result in read_results {
761            match result {
762                Ok(entries) => successful_reads.push(entries),
763                Err(e) => {
764                    tracing::warn!("Redis stream read failed: {e}");
765                    failed_count = failed_count.saturating_add(1);
766                }
767            }
768        }
769
770        if !self.quorum_reached(successful_reads.len()) {
771            return Err(anyhow!(
772                "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
773                successful_reads.len(),
774                self.redis_nodes.len(),
775                failed_count
776            ));
777        }
778
779        let blocks_by_node = successful_reads
780            .into_iter()
781            .map(|entries| {
782                entries.into_iter().fold(
783                    HashMap::<u32, HashMap<u64, SealedBlock>>::new(),
784                    |mut blocks_by_height, (height, epoch, block)| {
785                        blocks_by_height
786                            .entry(height)
787                            .or_default()
788                            .insert(epoch, block);
789                        blocks_by_height
790                    },
791                )
792            })
793            .collect::<Vec<_>>();
794
795        // Compute stream trim headroom: min stream height - local committed height
796        let min_stream_height = blocks_by_node
797            .iter()
798            .flat_map(|blocks_by_height| blocks_by_height.keys().copied())
799            .min();
800        if let Some(min_h) = min_stream_height {
801            let local_committed = i64::from(u32::from(next_height).saturating_sub(1));
802            let headroom = i64::from(min_h).saturating_sub(local_committed);
803            poa_metrics().stream_trim_headroom.set(headroom);
804        }
805
806        let mut current_height = u32::from(next_height);
807
808        for _ in 0..max_reconcile_blocks_per_round {
809            let nodes_with_height = blocks_by_node
810                .iter()
811                .filter(|blocks_by_height| blocks_by_height.contains_key(&current_height))
812                .count();
813
814            tracing::debug!(
815                "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}",
816                blocks_by_node.len()
817            );
818
819            if nodes_with_height == 0 {
820                if reconciled.is_empty() {
821                    return Err(anyhow!(
822                        "Backlog unresolved at height {current_height}: \
823                         stream indicates backlog but no entries found at next height"
824                    ));
825                }
826                break;
827            }
828
829            // Group votes by block_id only (not epoch). The same block can
830            // be written to different nodes with different epochs during
831            // re-promotion storms — but if the block_id matches, it's the
832            // same block and all copies count toward quorum. We track the
833            // max epoch per block_id as the tiebreaker for fork resolution
834            // when block_ids genuinely differ.
835            let votes = blocks_by_node
836                .iter()
837                .filter_map(|blocks_by_height| blocks_by_height.get(&current_height))
838                .flat_map(|blocks_by_epoch| blocks_by_epoch.iter())
839                .fold(
840                    HashMap::<BlockId, (u64, usize, SealedBlock)>::new(),
841                    |mut votes, (epoch, block)| {
842                        let vote_key = block.entity.id();
843                        match votes.get_mut(&vote_key) {
844                            Some((max_epoch, count, _)) => {
845                                *count = count.saturating_add(1);
846                                if *epoch > *max_epoch {
847                                    *max_epoch = *epoch;
848                                }
849                            }
850                            None => {
851                                votes.insert(vote_key, (*epoch, 1, block.clone()));
852                            }
853                        }
854                        votes
855                    },
856                );
857
858            let winner = votes
859                .into_iter()
860                .max_by_key(|(_, (max_epoch, _, _))| *max_epoch)
861                .map(|(_, (_, count, block))| (count, block));
862
863            if let Some((count, block)) = winner {
864                if self.quorum_reached(count) {
865                    // Block already has quorum — reconcile it directly
866                    reconciled.push(block);
867                } else {
868                    // Sub-quorum block: repropose to all nodes to reach quorum.
869                    // This repairs orphaned partial writes from failed leaders.
870                    // HEIGHT_EXISTS on nodes that already have the block returns
871                    // Ok(false), and nodes missing it accept the write.
872                    tracing::info!(
873                        "Repairing sub-quorum block at height {current_height} \
874                         (found on {count}/{} nodes)",
875                        blocks_by_node.len()
876                    );
877                    match self.repair_sub_quorum_block(&block, count) {
878                        Ok(true) => {
879                            tracing::info!(
880                                "Repair succeeded — block at height {current_height} \
881                                 now has quorum"
882                            );
883                            reconciled.push(block);
884                        }
885                        Ok(false) => {
886                            tracing::warn!(
887                                "Repair failed to reach quorum at height \
888                                 {current_height} — will retry next round"
889                            );
890                            if reconciled.is_empty() {
891                                return Err(anyhow!(
892                                    "Backlog unresolved at height {current_height}: \
893                                     repair failed to reach quorum"
894                                ));
895                            }
896                            break;
897                        }
898                        Err(e) => {
899                            tracing::warn!(
900                                "Repair error at height {current_height}: {e}"
901                            );
902                            if reconciled.is_empty() {
903                                return Err(anyhow!(
904                                    "Backlog unresolved at height {current_height}: \
905                                     repair error: {e}"
906                                ));
907                            }
908                            break;
909                        }
910                    }
911                }
912            } else {
913                if reconciled.is_empty() {
914                    return Err(anyhow!(
915                        "Backlog unresolved at height {current_height}: \
916                         no winning block candidate"
917                    ));
918                }
919                break;
920            }
921
922            let Some(next) = current_height.checked_add(1) else {
923                break;
924            };
925            current_height = next;
926        }
927
928        Ok(reconciled)
929    }
930
931    async fn can_produce_block(&self) -> anyhow::Result<bool> {
932        tracing::debug!("Checking Redis leader lock");
933        if self.has_lease_owner_quorum().await? {
934            tracing::debug!(
935                quorum = self.quorum,
936                redis_nodes = self.redis_nodes.len(),
937                current_epoch_token = ?self.current_epoch_token_value(),
938                lease_key = %self.lease_key,
939                "This authority already holds leader lock quorum"
940            );
941            return Ok(true);
942        }
943        self.acquire_lease_if_free().await
944    }
945
946    async fn release_if_owner(&self) -> anyhow::Result<()> {
947        tracing::debug!("Releasing Redis leader lock");
948        if !self.has_lease_owner_quorum().await? {
949            let mut current_epoch_token = self
950                .current_epoch_token
951                .lock()
952                .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
953            *current_epoch_token = None;
954            return Ok(());
955        }
956
957        let releases = futures::future::join_all(
958            self.redis_nodes
959                .iter()
960                .map(|redis_node| self.release_lease_on_node(redis_node)),
961        )
962        .await;
963        let released_count = releases.into_iter().filter(|released| *released).count();
964        if self.quorum_reached(released_count) {
965            let mut current_epoch_token = self
966                .current_epoch_token
967                .lock()
968                .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
969            *current_epoch_token = None;
970            Ok(())
971        } else {
972            Err(anyhow!("Failed to release lease on quorum"))
973        }
974    }
975
976    fn current_epoch_token_value(&self) -> Option<u64> {
977        self.current_epoch_token
978            .lock()
979            .ok()
980            .and_then(|epoch| *epoch)
981    }
982
983    fn publish_block_on_all_nodes(
984        &self,
985        epoch: u64,
986        block: &SealedBlock,
987        block_data: &[u8],
988    ) -> Vec<anyhow::Result<WriteBlockResult>> {
989        // Detached `std::thread::spawn` (not scoped) lets us return as soon
990        // as `Written` quorum is reached without waiting for slow nodes.
991        // Stragglers — including any thread blocked inside the sync `redis`
992        // client — are abandoned: they will exit when their syscall returns
993        // (e.g. when the peer recovers, or when SO_RCVTIMEO fires inside
994        // `invoke_write_block_script`'s set_read_timeout/set_write_timeout
995        // window). Their result is discarded via the dropped channel.
996        //
997        // This is the class-of-failure fix for the 2026-04-22 mainnet hang
998        // where a half-alive ElastiCache node stalled one thread inside the
999        // pre-1.2 `redis::Client::get_connection_with_timeout` handshake,
1000        // which combined with the previous `std::thread::scope` to wedge
1001        // every block publish forever. The redis crate upgrade to 1.2 also
1002        // closes that specific upstream bug; the short-circuit here protects
1003        // against any future single-node hang that survives per-syscall
1004        // timeouts.
1005        let n = self.redis_nodes.len();
1006        let block_data: std::sync::Arc<[u8]> = block_data.into();
1007        let block_height = u32::from(*block.entity.header().height());
1008        let block_stream_key: std::sync::Arc<str> = self.block_stream_key.as_str().into();
1009        let epoch_key: std::sync::Arc<str> = self.epoch_key.as_str().into();
1010        let lease_key: std::sync::Arc<str> = self.lease_key.as_str().into();
1011        let lease_owner_token: std::sync::Arc<str> =
1012            self.lease_owner_token.as_str().into();
1013        let lease_ttl_millis = self.lease_ttl_millis;
1014        let stream_max_len = self.stream_max_len;
1015        let node_timeout = self.node_timeout;
1016
1017        let (tx, rx) =
1018            std::sync::mpsc::channel::<(usize, anyhow::Result<WriteBlockResult>)>();
1019
1020        for (idx, redis_node) in self.redis_nodes.iter().enumerate() {
1021            let client = redis_node.redis_client.clone();
1022            let tx = tx.clone();
1023            let block_data = block_data.clone();
1024            let block_stream_key = block_stream_key.clone();
1025            let epoch_key = epoch_key.clone();
1026            let lease_key = lease_key.clone();
1027            let lease_owner_token = lease_owner_token.clone();
1028            std::thread::spawn(move || {
1029                let result = Self::invoke_write_block_script(
1030                    &client,
1031                    node_timeout,
1032                    &block_stream_key,
1033                    &epoch_key,
1034                    &lease_key,
1035                    epoch,
1036                    &lease_owner_token,
1037                    block_height,
1038                    &block_data,
1039                    lease_ttl_millis,
1040                    stream_max_len,
1041                );
1042                // Send may fail if the receiver was dropped because we
1043                // already short-circuited on quorum — that's intentional.
1044                let _ = tx.send((idx, result));
1045            });
1046        }
1047        // Drop our local sender so `rx.recv` returns `Err` once every spawned
1048        // thread has either sent or dropped its sender.
1049        drop(tx);
1050
1051        let mut results: Vec<Option<anyhow::Result<WriteBlockResult>>> =
1052            (0..n).map(|_| None).collect();
1053        let mut written_count = 0usize;
1054        let mut received = 0usize;
1055
1056        while received < n {
1057            let Ok((idx, result)) = rx.recv() else {
1058                // All senders dropped (every thread either reported or
1059                // panicked). Stop draining and fill remaining slots below.
1060                break;
1061            };
1062            if matches!(result, Ok(WriteBlockResult::Written)) {
1063                written_count = written_count.saturating_add(1);
1064            }
1065            results[idx] = Some(result);
1066            received = received.saturating_add(1);
1067
1068            if self.quorum_reached(written_count) {
1069                // Quorum reached. Return immediately; any threads still
1070                // running are abandoned. Their later `tx.send` will fail
1071                // silently because we drop the receiver below.
1072                break;
1073            }
1074        }
1075
1076        results
1077            .into_iter()
1078            .map(|r| {
1079                r.unwrap_or_else(|| {
1080                    Err(anyhow!(
1081                        "publish abandoned: quorum reached before this \
1082                         node responded"
1083                    ))
1084                })
1085            })
1086            .collect()
1087    }
1088
1089    /// `'static` helper that runs `write_block.lua` against a single node.
1090    /// Free function (no `&self`) so detached threads spawned by
1091    /// `publish_block_on_all_nodes` can run it without borrowing the adapter.
1092    #[allow(clippy::too_many_arguments)]
1093    fn invoke_write_block_script(
1094        redis_client: &redis::Client,
1095        node_timeout: Duration,
1096        block_stream_key: &str,
1097        epoch_key: &str,
1098        lease_key: &str,
1099        epoch: u64,
1100        lease_owner_token: &str,
1101        block_height: u32,
1102        block_data: &[u8],
1103        lease_ttl_millis: u64,
1104        stream_max_len: u32,
1105    ) -> anyhow::Result<WriteBlockResult> {
1106        let mut connection = redis_client.get_connection_with_timeout(node_timeout)?;
1107        connection.set_read_timeout(Some(node_timeout))?;
1108        connection.set_write_timeout(Some(node_timeout))?;
1109        let lua_start = std::time::Instant::now();
1110        let write_result = redis::Script::new(WRITE_BLOCK_SCRIPT)
1111            .key(block_stream_key)
1112            .key(epoch_key)
1113            .key(lease_key)
1114            .arg(epoch)
1115            .arg(lease_owner_token)
1116            .arg(block_height)
1117            .arg(block_data)
1118            .arg(lease_ttl_millis)
1119            .arg(stream_max_len)
1120            .invoke::<String>(&mut connection);
1121        poa_metrics()
1122            .write_block_duration_s
1123            .observe(lua_start.elapsed().as_secs_f64());
1124        match write_result {
1125            Ok(_) => {
1126                poa_metrics().write_block_success_total.inc();
1127                Ok(WriteBlockResult::Written)
1128            }
1129            Err(err) if err.to_string().contains("HEIGHT_EXISTS:") => {
1130                poa_metrics().write_block_height_exists_total.inc();
1131                tracing::debug!(
1132                    "write_block: height already exists (height={block_height})"
1133                );
1134                Ok(WriteBlockResult::HeightExists)
1135            }
1136            Err(err) if err.to_string().contains("FENCING_ERROR:") => {
1137                poa_metrics().write_block_fencing_error_total.inc();
1138                tracing::warn!(
1139                    "write_block: fencing rejected (height={block_height}): {err}"
1140                );
1141                Ok(WriteBlockResult::FencingRejected)
1142            }
1143            Err(err) => {
1144                poa_metrics().write_block_error_total.inc();
1145                Err(err.into())
1146            }
1147        }
1148    }
1149
1150    /// Repropose a sub-quorum block to all Redis nodes to reach quorum.
1151    /// Called during reconciliation when a block exists on some nodes but
1152    /// below quorum — possibly from a leader that published and committed
1153    /// locally but whose write only reached a subset of nodes.
1154    ///
1155    /// `pre_existing_count` is the number of nodes already confirmed to
1156    /// have this specific block during the reconciliation read phase.
1157    ///
1158    /// Uses `publish_block_on_all_nodes` which runs `write_block.lua`:
1159    /// - Written: node accepted the block (counted toward quorum)
1160    /// - HEIGHT_EXISTS: node has *some* block at this height — may be a
1161    ///   different block from a competing partial write, so NOT counted
1162    /// - FENCING_ERROR: lost the lock — abort the repair
1163    /// - The total (pre_existing + newly written) must reach quorum
1164    fn repair_sub_quorum_block(
1165        &self,
1166        block: &SealedBlock,
1167        pre_existing_count: usize,
1168    ) -> anyhow::Result<bool> {
1169        let epoch = match *self
1170            .current_epoch_token
1171            .lock()
1172            .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1173        {
1174            Some(epoch) => epoch,
1175            None => {
1176                return Err(anyhow!(
1177                    "Cannot repair block because fencing token is not initialized"
1178                ));
1179            }
1180        };
1181        let block_data = postcard::to_allocvec(block)?;
1182        // Start from the pre-existing count (nodes already confirmed to
1183        // have this specific block during reconciliation). Only count
1184        // newly Written nodes — HeightExists means the node has *some*
1185        // block at this height, but it might be a different block from
1186        // a competing leader's partial write.
1187        let mut total_with_block = pre_existing_count;
1188        for result in self.publish_block_on_all_nodes(epoch, block, &block_data) {
1189            match result {
1190                Ok(WriteBlockResult::Written) => {
1191                    total_with_block = total_with_block.saturating_add(1);
1192                }
1193                Ok(WriteBlockResult::HeightExists) => {
1194                    // Node has some block at this height — may or may
1195                    // not be ours. Don't count it; the pre_existing_count
1196                    // already includes nodes confirmed to have our block.
1197                }
1198                Ok(WriteBlockResult::FencingRejected) => {
1199                    // Lost the lock — repair is invalid, abort
1200                    return Err(anyhow!(
1201                        "Lost lock during repair — another leader took over"
1202                    ));
1203                }
1204                Err(err) => {
1205                    tracing::debug!("Repair write to node failed: {err}");
1206                }
1207            }
1208        }
1209        let reached_quorum = self.quorum_reached(total_with_block);
1210        if reached_quorum {
1211            poa_metrics().repair_success_total.inc();
1212        } else {
1213            poa_metrics().repair_failure_total.inc();
1214        }
1215        Ok(reached_quorum)
1216    }
1217}
1218
1219/// Result of a `write_block.lua` invocation on a single Redis node.
1220enum WriteBlockResult {
1221    /// Block was successfully written to the stream.
1222    Written,
1223    /// A block at this height already exists in the stream.
1224    HeightExists,
1225    /// Lock lost or epoch is stale — another leader holds the lock.
1226    FencingRejected,
1227}
1228
1229impl PoAAdapter {
1230    pub fn new(shared_state: Option<SharedState>) -> Self {
1231        Self { shared_state }
1232    }
1233
1234    pub async fn manually_produce_blocks(
1235        &self,
1236        start_time: Option<Tai64>,
1237        mode: Mode,
1238    ) -> anyhow::Result<()> {
1239        self.shared_state
1240            .as_ref()
1241            .ok_or(anyhow!("The block production is disabled"))?
1242            .manually_produce_block(start_time, mode)
1243            .await
1244    }
1245}
1246
1247#[async_trait::async_trait]
1248impl BlockReconciliationReadPort for NoopReconciliationAdapter {
1249    async fn leader_state(
1250        &self,
1251        _next_height: BlockHeight,
1252    ) -> anyhow::Result<LeaderState> {
1253        Ok(LeaderState::ReconciledLeader)
1254    }
1255
1256    async fn release(&self) -> anyhow::Result<()> {
1257        Ok(())
1258    }
1259}
1260
1261#[async_trait::async_trait]
1262impl BlockReconciliationReadPort for RedisLeaderLeaseAdapter {
1263    async fn leader_state(
1264        &self,
1265        next_height: BlockHeight,
1266    ) -> anyhow::Result<LeaderState> {
1267        if self.can_produce_block().await? {
1268            poa_metrics().is_leader.set(1);
1269            if let Ok(epoch) = self.current_epoch_token.lock()
1270                && let Some(epoch) = *epoch
1271            {
1272                poa_metrics()
1273                    .leader_epoch
1274                    .set(i64::try_from(epoch).unwrap_or(i64::MAX));
1275            }
1276            let reconcile_start = std::time::Instant::now();
1277            let unreconciled_blocks = self.unreconciled_blocks(next_height).await?;
1278            poa_metrics()
1279                .reconciliation_duration_s
1280                .observe(reconcile_start.elapsed().as_secs_f64());
1281            if unreconciled_blocks.is_empty() {
1282                Ok(LeaderState::ReconciledLeader)
1283            } else {
1284                Ok(LeaderState::UnreconciledBlocks(unreconciled_blocks))
1285            }
1286        } else {
1287            poa_metrics().is_leader.set(0);
1288            Ok(LeaderState::ReconciledFollower)
1289        }
1290    }
1291
1292    async fn release(&self) -> anyhow::Result<()> {
1293        self.release_if_owner().await
1294    }
1295}
1296
1297#[async_trait::async_trait]
1298impl BlockReconciliationReadPort for ReconciliationAdapter {
1299    async fn leader_state(
1300        &self,
1301        next_height: BlockHeight,
1302    ) -> anyhow::Result<LeaderState> {
1303        match self {
1304            Self::Redis(adapter) => adapter.leader_state(next_height).await,
1305            Self::Noop(adapter) => adapter.leader_state(next_height).await,
1306        }
1307    }
1308
1309    async fn release(&self) -> anyhow::Result<()> {
1310        match self {
1311            Self::Redis(adapter) => adapter.release().await,
1312            Self::Noop(adapter) => adapter.release().await,
1313        }
1314    }
1315}
1316
1317impl Drop for RedisLeaderLeaseAdapter {
1318    fn drop(&mut self) {
1319        if std::sync::Arc::strong_count(&self.drop_release_guard) != 1 {
1320            return;
1321        }
1322
1323        let redis_clients = self
1324            .redis_nodes
1325            .iter()
1326            .map(|redis_node| redis_node.redis_client.clone())
1327            .collect::<Vec<_>>();
1328        if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
1329            let release_future = timeout(
1330                Duration::from_millis(100),
1331                Self::release_lease_on_clients(
1332                    redis_clients,
1333                    self.lease_key.clone(),
1334                    self.lease_owner_token.clone(),
1335                    self.node_timeout,
1336                ),
1337            );
1338            drop(runtime_handle.spawn(async move {
1339                if release_future.await.is_err() {
1340                    error!("Failed to release leader lease: timeout");
1341                }
1342            }));
1343            return;
1344        }
1345
1346        Self::release_lease_on_clients_sync(
1347            redis_clients,
1348            self.lease_key.clone(),
1349            self.lease_owner_token.clone(),
1350        );
1351    }
1352}
1353
1354impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter {
1355    fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
1356        let epoch = match *self
1357            .current_epoch_token
1358            .lock()
1359            .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1360        {
1361            Some(epoch) => epoch,
1362            None => {
1363                if matches!(
1364                    block.consensus,
1365                    fuel_core_types::blockchain::consensus::Consensus::Genesis(_)
1366                ) {
1367                    tracing::debug!(
1368                        "Skipping redis block publish for genesis block because fencing token is not initialized"
1369                    );
1370                    return Ok(());
1371                }
1372                return Err(anyhow!(
1373                    "Cannot publish block because fencing token is not initialized"
1374                ));
1375            }
1376        };
1377        let block_data = postcard::to_allocvec(block)?;
1378        let successes = self
1379            .publish_block_on_all_nodes(epoch, block, &block_data)
1380            .into_iter()
1381            .map(|result| match result {
1382                Ok(WriteBlockResult::Written) => true,
1383                Ok(_) => false,
1384                Err(err) => {
1385                    tracing::debug!("Redis publish on node failed: {err}");
1386                    false
1387                }
1388            })
1389            .filter(|success| *success)
1390            .count();
1391        if self.quorum_reached(successes) {
1392            Ok(())
1393        } else {
1394            Err(anyhow!(
1395                "Failed to publish block to redis quorum with fencing checks"
1396            ))
1397        }
1398    }
1399}
1400
1401#[async_trait::async_trait]
1402impl ConsensusModulePort for PoAAdapter {
1403    async fn manually_produce_blocks(
1404        &self,
1405        start_time: Option<Tai64>,
1406        number_of_blocks: u32,
1407    ) -> anyhow::Result<()> {
1408        self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
1409            .await
1410    }
1411}
1412
1413#[cfg(feature = "p2p")]
1414impl P2pPort for P2PAdapter {
1415    fn reserved_peers_count(&self) -> BoxStream<usize> {
1416        if let Some(service) = &self.service {
1417            Box::pin(
1418                BroadcastStream::new(service.subscribe_reserved_peers_count())
1419                    .filter_map(|result| result.ok()),
1420            )
1421        } else {
1422            Box::pin(tokio_stream::pending())
1423        }
1424    }
1425}
1426
1427#[cfg(not(feature = "p2p"))]
1428impl P2pPort for P2PAdapter {
1429    fn reserved_peers_count(&self) -> BoxStream<usize> {
1430        Box::pin(tokio_stream::pending())
1431    }
1432}
1433
1434pub struct InDirectoryPredefinedBlocks {
1435    path_to_directory: Option<PathBuf>,
1436}
1437
1438impl InDirectoryPredefinedBlocks {
1439    pub fn new(path_to_directory: Option<PathBuf>) -> Self {
1440        Self { path_to_directory }
1441    }
1442}
1443
1444impl PredefinedBlocks for InDirectoryPredefinedBlocks {
1445    fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
1446        let Some(path) = &self.path_to_directory else {
1447            return Ok(None);
1448        };
1449
1450        let block_height: u32 = (*height).into();
1451        if block_exists(path.as_path(), block_height) {
1452            let block_path = block_path(path.as_path(), block_height);
1453            let block_bytes = std::fs::read(block_path)?;
1454            let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
1455            Ok(Some(block))
1456        } else {
1457            Ok(None)
1458        }
1459    }
1460}
1461
1462pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
1463    path_to_directory.join(format!("{block_height}.json"))
1464}
1465
1466pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
1467    block_path(path_to_directory, block_height).exists()
1468}
1469
1470impl TransactionPool for TxPoolAdapter {
1471    fn new_txs_watcher(&self) -> watch::Receiver<()> {
1472        self.service.get_new_executable_txs_notifier()
1473    }
1474}
1475
1476#[async_trait::async_trait]
1477impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
1478    async fn produce_and_execute_block(
1479        &self,
1480        height: BlockHeight,
1481        block_time: Tai64,
1482        source: TransactionsSource,
1483        deadline: Instant,
1484    ) -> anyhow::Result<UncommittedResult<Changes>> {
1485        match source {
1486            TransactionsSource::TxPool => {
1487                self.block_producer
1488                    .produce_and_execute_block_txpool(height, block_time, deadline)
1489                    .await
1490            }
1491            TransactionsSource::SpecificTransactions(txs) => {
1492                self.block_producer
1493                    .produce_and_execute_block_transactions(height, block_time, txs)
1494                    .await
1495            }
1496        }
1497    }
1498
1499    async fn produce_predefined_block(
1500        &self,
1501        block: &Block,
1502    ) -> anyhow::Result<UncommittedResult<Changes>> {
1503        self.block_producer
1504            .produce_and_execute_predefined(block, ())
1505            .await
1506    }
1507}
1508
1509#[async_trait::async_trait]
1510impl BlockImporter for BlockImporterAdapter {
1511    async fn commit_result(
1512        &self,
1513        result: UncommittedImporterResult<Changes>,
1514    ) -> anyhow::Result<()> {
1515        self.block_importer
1516            .commit_result(result)
1517            .await
1518            .map_err(Into::into)
1519    }
1520
1521    async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> {
1522        self.block_importer
1523            .execute_and_commit(block)
1524            .await
1525            .map_err(Into::into)
1526    }
1527
1528    fn block_stream(&self) -> BoxStream<BlockImportInfo> {
1529        Box::pin(
1530            BroadcastStream::new(self.block_importer.subscribe())
1531                .filter_map(|result| result.ok())
1532                .map(|result| BlockImportInfo::from(result.shared_result)),
1533        )
1534    }
1535
1536    fn latest_block_height(&self) -> anyhow::Result<Option<BlockHeight>> {
1537        self.database.latest_block_height().map_err(Into::into)
1538    }
1539}
1540
1541#[cfg(all(test, feature = "leader_lock", not(feature = "not_leader_lock")))]
1542#[allow(non_snake_case)]
1543mod tests {
1544    use super::*;
1545    use fuel_core_importer::ports::BlockReconciliationWritePort;
1546    use fuel_core_poa::ports::BlockReconciliationReadPort;
1547    use fuel_core_types::blockchain::consensus::Consensus;
1548    use std::{
1549        io::Read as _,
1550        net::{
1551            SocketAddrV4,
1552            TcpListener,
1553            TcpStream,
1554        },
1555        process::{
1556            Child,
1557            Command,
1558            Stdio,
1559        },
1560        sync::mpsc,
1561        thread,
1562        time::{
1563            Duration,
1564            Instant,
1565        },
1566    };
1567
1568    #[tokio::test(flavor = "multi_thread")]
1569    async fn leader_state__when_same_height_has_multiple_stream_entries_then_returns_highest_epoch_block()
1570     {
1571        // given
1572        let redis = RedisTestServer::spawn();
1573        let lease_key = "poa:test:stream-conflict".to_string();
1574        let stream_key = format!("{lease_key}:block:stream");
1575        let adapter = RedisLeaderLeaseAdapter::new(
1576            vec![redis.redis_url()],
1577            lease_key,
1578            Duration::from_secs(2),
1579            Duration::from_millis(100),
1580            Duration::from_millis(50),
1581            Duration::from_millis(0),
1582            1,
1583            1000,
1584        )
1585        .expect("adapter should be created");
1586
1587        let low_epoch_block = poa_block_at_time(1, 10);
1588        let high_epoch_block = poa_block_at_time(1, 20);
1589
1590        let low_epoch_data =
1591            postcard::to_allocvec(&low_epoch_block).expect("serialize block");
1592        let high_epoch_data =
1593            postcard::to_allocvec(&high_epoch_block).expect("serialize block");
1594
1595        let redis_client =
1596            redis::Client::open(redis.redis_url()).expect("redis client should open");
1597        let mut conn = redis_client
1598            .get_connection()
1599            .expect("redis connection should open");
1600        append_stream_block(&mut conn, &stream_key, 1, &low_epoch_data, 1);
1601        append_stream_block(&mut conn, &stream_key, 1, &high_epoch_data, 2);
1602
1603        // when
1604        let leader_state = adapter
1605            .leader_state(1.into())
1606            .await
1607            .expect("leader_state should succeed");
1608
1609        // then
1610        let unreconciled_blocks = match leader_state {
1611            LeaderState::UnreconciledBlocks(blocks) => blocks,
1612            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1613        };
1614        assert_eq!(unreconciled_blocks.len(), 1);
1615        assert_eq!(
1616            unreconciled_blocks[0].entity.header().time(),
1617            high_epoch_block.entity.header().time(),
1618            "Expected reconciliation to pick the highest epoch block for the same height",
1619        );
1620    }
1621
1622    #[tokio::test(flavor = "multi_thread")]
1623    async fn leader_state__when_same_height_same_epoch_has_multiple_stream_entries_then_keeps_latest_entry()
1624     {
1625        // given
1626        let redis = RedisTestServer::spawn();
1627        let lease_key = "poa:test:equal-epoch-latest-entry".to_string();
1628        let stream_key = format!("{lease_key}:block:stream");
1629        let adapter = RedisLeaderLeaseAdapter::new(
1630            vec![redis.redis_url()],
1631            lease_key,
1632            Duration::from_secs(2),
1633            Duration::from_millis(100),
1634            Duration::from_millis(50),
1635            Duration::from_millis(0),
1636            1,
1637            1000,
1638        )
1639        .expect("adapter should be created");
1640
1641        let stale_block = poa_block_at_time(1, 10);
1642        let retry_block = poa_block_at_time(1, 20);
1643        let stale_data =
1644            postcard::to_allocvec(&stale_block).expect("stale block should serialize");
1645        let retry_data =
1646            postcard::to_allocvec(&retry_block).expect("retry block should serialize");
1647
1648        let redis_client =
1649            redis::Client::open(redis.redis_url()).expect("redis client should open");
1650        let mut conn = redis_client
1651            .get_connection()
1652            .expect("redis connection should open");
1653        append_stream_block(&mut conn, &stream_key, 1, &stale_data, 1);
1654        append_stream_block(&mut conn, &stream_key, 1, &retry_data, 1);
1655
1656        // when
1657        let leader_state = adapter
1658            .leader_state(1.into())
1659            .await
1660            .expect("leader_state should succeed");
1661
1662        // then
1663        let unreconciled_blocks = match leader_state {
1664            LeaderState::UnreconciledBlocks(blocks) => blocks,
1665            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1666        };
1667        assert_eq!(unreconciled_blocks.len(), 1);
1668        assert_eq!(
1669            unreconciled_blocks[0].entity.id(),
1670            retry_block.entity.id(),
1671            "Expected reconciliation to keep latest stream entry for equal epoch",
1672        );
1673    }
1674
1675    #[tokio::test(flavor = "multi_thread")]
1676    async fn leader_state__when_height_has_disagreeing_block_ids_then_repairs_with_highest_epoch_block()
1677     {
1678        // given: two different blocks at height 1 on different nodes, same epoch
1679        let redis_a = RedisTestServer::spawn();
1680        let redis_b = RedisTestServer::spawn();
1681        let redis_c = RedisTestServer::spawn();
1682        let lease_key = "poa:test:epoch-quorum-block-mismatch".to_string();
1683        let stream_key = format!("{lease_key}:block:stream");
1684        let adapter = new_test_adapter(
1685            vec![
1686                redis_a.redis_url(),
1687                redis_b.redis_url(),
1688                redis_c.redis_url(),
1689            ],
1690            lease_key,
1691        );
1692        assert!(
1693            adapter
1694                .acquire_lease_if_free()
1695                .await
1696                .expect("acquire should succeed"),
1697            "adapter should acquire lease"
1698        );
1699
1700        let block_a = poa_block_at_time(1, 10);
1701        let block_b = poa_block_at_time(1, 20);
1702        let block_a_data =
1703            postcard::to_allocvec(&block_a).expect("block a should serialize");
1704        let block_b_data =
1705            postcard::to_allocvec(&block_b).expect("block b should serialize");
1706
1707        let redis_a_client =
1708            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1709        let redis_b_client =
1710            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1711        let mut conn_a = redis_a_client
1712            .get_connection()
1713            .expect("redis a connection should open");
1714        let mut conn_b = redis_b_client
1715            .get_connection()
1716            .expect("redis b connection should open");
1717
1718        // Both at epoch 7 but different block data — each on 1 node (sub-quorum)
1719        append_stream_block(&mut conn_a, &stream_key, 1, &block_a_data, 7);
1720        append_stream_block(&mut conn_b, &stream_key, 1, &block_b_data, 7);
1721
1722        // when: leader reconciles — should pick one and repair to quorum
1723        // The repair writes to node C (empty), giving the winner 2/3
1724        let leader_state = adapter
1725            .leader_state(1.into())
1726            .await
1727            .expect("leader_state should succeed");
1728
1729        // then: one of the blocks is repaired and returned
1730        assert!(
1731            matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1732            "Expected repair to pick one block and reach quorum, got {leader_state:?}",
1733        );
1734    }
1735
1736    /// Reproduces the devnet deadlock from April 17, 2026.
1737    ///
1738    /// The same block was written to all 3 nodes during re-promotion storms,
1739    /// so each node has the same block_id but with different epoch metadata.
1740    /// The old `(epoch, block_id)` vote grouping fragmented these into
1741    /// separate vote groups, with the max-epoch group having a count below
1742    /// quorum. Repair then failed because every node returned HEIGHT_EXISTS.
1743    ///
1744    /// With the fix (grouping by block_id only), all copies of the same
1745    /// block count toward quorum regardless of epoch metadata — so this
1746    /// state resolves without repair.
1747    #[tokio::test(flavor = "multi_thread")]
1748    async fn leader_state__when_same_block_has_different_epochs_across_nodes_then_reconciles_without_repair()
1749     {
1750        // given: same block on all 3 nodes, but with different epochs
1751        // (as happens when re-promotion writes race during production)
1752        let redis_a = RedisTestServer::spawn();
1753        let redis_b = RedisTestServer::spawn();
1754        let redis_c = RedisTestServer::spawn();
1755        let lease_key = "poa:test:same-block-different-epochs".to_string();
1756        let stream_key = format!("{lease_key}:block:stream");
1757        let adapter = new_test_adapter(
1758            vec![
1759                redis_a.redis_url(),
1760                redis_b.redis_url(),
1761                redis_c.redis_url(),
1762            ],
1763            lease_key,
1764        );
1765        assert!(
1766            adapter
1767                .acquire_lease_if_free()
1768                .await
1769                .expect("acquire should succeed"),
1770            "adapter should acquire lease"
1771        );
1772
1773        // Same block (same data, same block_id) on all 3 nodes, but each
1774        // with a different epoch. This simulates what happens when the
1775        // original leader was re-promoted repeatedly during a race,
1776        // writing the same block content each time with a bumped epoch.
1777        let block = poa_block_at_time(1, 10);
1778        let block_data = postcard::to_allocvec(&block).expect("should serialize");
1779
1780        let redis_a_client =
1781            redis::Client::open(redis_a.redis_url()).expect("redis a client");
1782        let redis_b_client =
1783            redis::Client::open(redis_b.redis_url()).expect("redis b client");
1784        let redis_c_client =
1785            redis::Client::open(redis_c.redis_url()).expect("redis c client");
1786        let mut conn_a = redis_a_client.get_connection().expect("redis a conn");
1787        let mut conn_b = redis_b_client.get_connection().expect("redis b conn");
1788        let mut conn_c = redis_c_client.get_connection().expect("redis c conn");
1789
1790        // Same block_id, three different epochs
1791        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 5);
1792        append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 7);
1793        append_stream_block(&mut conn_c, &stream_key, 1, &block_data, 9);
1794
1795        // when: leader reconciles
1796        let leader_state = adapter
1797            .leader_state(1.into())
1798            .await
1799            .expect("leader_state should succeed");
1800
1801        // then: the block is reconciled directly (no repair needed). Without
1802        // the fix, the old logic would have split the 3 copies into 3 vote
1803        // groups and tried to repair the max-epoch group (count=1), which
1804        // would deadlock because every node returns HEIGHT_EXISTS.
1805        assert!(
1806            matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1807            "Expected block to be reconciled from quorum across mixed epochs, got {leader_state:?}"
1808        );
1809    }
1810
1811    #[tokio::test(flavor = "multi_thread")]
1812    async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it()
1813     {
1814        // given: orphan block on only 1 of 3 nodes (below quorum)
1815        let redis_a = RedisTestServer::spawn();
1816        let redis_b = RedisTestServer::spawn();
1817        let redis_c = RedisTestServer::spawn();
1818        let lease_key = "poa:test:below-quorum".to_string();
1819        let stream_key = format!("{lease_key}:block:stream");
1820        let adapter = new_test_adapter(
1821            vec![
1822                redis_a.redis_url(),
1823                redis_b.redis_url(),
1824                redis_c.redis_url(),
1825            ],
1826            lease_key,
1827        );
1828        assert!(
1829            adapter
1830                .acquire_lease_if_free()
1831                .await
1832                .expect("acquire should succeed"),
1833            "adapter should acquire lease"
1834        );
1835
1836        let orphan_block = poa_block_at_time(1, 10);
1837        let orphan_block_data =
1838            postcard::to_allocvec(&orphan_block).expect("orphan block should serialize");
1839
1840        let redis_client =
1841            redis::Client::open(redis_a.redis_url()).expect("redis client should open");
1842        let mut conn = redis_client
1843            .get_connection()
1844            .expect("redis connection should open");
1845        append_stream_block(&mut conn, &stream_key, 1, &orphan_block_data, 1);
1846
1847        // when: leader reconciles — should repair the orphan to quorum
1848        let leader_state = adapter
1849            .leader_state(1.into())
1850            .await
1851            .expect("leader_state should succeed");
1852
1853        // then: orphan was reproposed to other nodes and returned for import
1854        assert!(
1855            matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1856            "Expected sub-quorum entry to be repaired and returned, got {leader_state:?}"
1857        );
1858    }
1859
1860    #[tokio::test(flavor = "multi_thread")]
1861    async fn leader_state__when_contiguous_heights_have_quorum_then_repairs_sub_quorum_tail()
1862     {
1863        // given: h1, h2 on quorum (a,b). h3 below quorum (a only).
1864        let redis_a = RedisTestServer::spawn();
1865        let redis_b = RedisTestServer::spawn();
1866        let redis_c = RedisTestServer::spawn();
1867        let lease_key = "poa:test:contiguous-quorum".to_string();
1868        let stream_key = format!("{lease_key}:block:stream");
1869        let adapter = new_test_adapter(
1870            vec![
1871                redis_a.redis_url(),
1872                redis_b.redis_url(),
1873                redis_c.redis_url(),
1874            ],
1875            lease_key,
1876        );
1877        assert!(
1878            adapter
1879                .acquire_lease_if_free()
1880                .await
1881                .expect("acquire should succeed"),
1882            "adapter should acquire lease"
1883        );
1884
1885        let h1 = poa_block_at_time(1, 10);
1886        let h2 = poa_block_at_time(2, 20);
1887        let h3 = poa_block_at_time(3, 30);
1888        let h1_data = postcard::to_allocvec(&h1).expect("h1 should serialize");
1889        let h2_data = postcard::to_allocvec(&h2).expect("h2 should serialize");
1890        let h3_data = postcard::to_allocvec(&h3).expect("h3 should serialize");
1891
1892        let redis_a_client =
1893            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1894        let redis_b_client =
1895            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1896        let mut conn_a = redis_a_client
1897            .get_connection()
1898            .expect("redis a connection should open");
1899        let mut conn_b = redis_b_client
1900            .get_connection()
1901            .expect("redis b connection should open");
1902
1903        // h1 on quorum (a,b)
1904        append_stream_block(&mut conn_a, &stream_key, 1, &h1_data, 1);
1905        append_stream_block(&mut conn_b, &stream_key, 1, &h1_data, 1);
1906        // h2 on quorum (a,b)
1907        append_stream_block(&mut conn_a, &stream_key, 2, &h2_data, 1);
1908        append_stream_block(&mut conn_b, &stream_key, 2, &h2_data, 1);
1909        // h3 below quorum (a only)
1910        append_stream_block(&mut conn_a, &stream_key, 3, &h3_data, 1);
1911
1912        // when: leader reconciles — h3 should be repaired to quorum
1913        let leader_state = adapter
1914            .leader_state(1.into())
1915            .await
1916            .expect("leader_state should succeed");
1917
1918        // then: all 3 heights returned (h3 was repaired)
1919        let unreconciled_blocks = match leader_state {
1920            LeaderState::UnreconciledBlocks(blocks) => blocks,
1921            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1922        };
1923        assert_eq!(
1924            unreconciled_blocks
1925                .iter()
1926                .map(|b| u32::from(*b.entity.header().height()))
1927                .collect::<Vec<_>>(),
1928            vec![1, 2, 3],
1929            "Expected all heights including repaired sub-quorum h3",
1930        );
1931    }
1932
1933    #[tokio::test(flavor = "multi_thread")]
1934    async fn leader_state__when_contiguous_quorum_blocks_are_present_then_returns_all_available_contiguous_blocks()
1935     {
1936        // given
1937        let redis_a = RedisTestServer::spawn();
1938        let redis_b = RedisTestServer::spawn();
1939        let redis_c = RedisTestServer::spawn();
1940        let lease_key = "poa:test:contiguous-over-128".to_string();
1941        let stream_key = format!("{lease_key}:block:stream");
1942        let adapter = RedisLeaderLeaseAdapter::new(
1943            vec![
1944                redis_a.redis_url(),
1945                redis_b.redis_url(),
1946                redis_c.redis_url(),
1947            ],
1948            lease_key,
1949            Duration::from_secs(2),
1950            Duration::from_millis(100),
1951            Duration::from_millis(50),
1952            Duration::from_millis(0),
1953            1,
1954            1000,
1955        )
1956        .expect("adapter should be created");
1957        let redis_a_client =
1958            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1959        let redis_b_client =
1960            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1961        let redis_c_client =
1962            redis::Client::open(redis_c.redis_url()).expect("redis c client should open");
1963        let mut conn_a = redis_a_client
1964            .get_connection()
1965            .expect("redis a connection should open");
1966        let mut conn_b = redis_b_client
1967            .get_connection()
1968            .expect("redis b connection should open");
1969        let mut conn_c = redis_c_client
1970            .get_connection()
1971            .expect("redis c connection should open");
1972        let _ = &mut conn_c;
1973
1974        (1_u32..=129_u32).for_each(|height| {
1975            let block = poa_block_at_time(height, u64::from(height));
1976            let block_data =
1977                postcard::to_allocvec(&block).expect("block should serialize");
1978            append_stream_block(&mut conn_a, &stream_key, height, &block_data, 1);
1979            append_stream_block(&mut conn_b, &stream_key, height, &block_data, 1);
1980        });
1981
1982        // when
1983        let leader_state = adapter
1984            .leader_state(1.into())
1985            .await
1986            .expect("leader_state should succeed");
1987
1988        // then
1989        let unreconciled_blocks = match leader_state {
1990            LeaderState::UnreconciledBlocks(blocks) => blocks,
1991            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1992        };
1993        assert_eq!(
1994            unreconciled_blocks.len(),
1995            129,
1996            "Expected all contiguous quorum-backed heights to reconcile in one call",
1997        );
1998    }
1999
2000    #[tokio::test(flavor = "multi_thread")]
2001    async fn publish_produced_block__when_fencing_token_is_uninitialized_then_returns_error()
2002     {
2003        // given
2004        let redis_a = RedisTestServer::spawn();
2005        let redis_b = RedisTestServer::spawn();
2006        let redis_c = RedisTestServer::spawn();
2007        let lease_key = "poa:test:missing-epoch".to_string();
2008        let redis_urls = vec![
2009            redis_a.redis_url(),
2010            redis_b.redis_url(),
2011            redis_c.redis_url(),
2012        ];
2013        let adapter = new_test_adapter(redis_urls, lease_key);
2014        let block = poa_block_at_time(1, 100);
2015
2016        // when
2017        let publish_result = adapter.publish_produced_block(&block);
2018
2019        // then
2020        assert!(
2021            publish_result.is_err(),
2022            "Publish should fail when fencing token is not initialized"
2023        );
2024    }
2025
2026    #[tokio::test(flavor = "multi_thread")]
2027    async fn release__when_adapter_is_not_lease_owner_then_returns_ok() {
2028        // given
2029        let redis_a = RedisTestServer::spawn();
2030        let redis_b = RedisTestServer::spawn();
2031        let redis_c = RedisTestServer::spawn();
2032        let lease_key = "poa:test:release-follower".to_string();
2033        let redis_urls = vec![
2034            redis_a.redis_url(),
2035            redis_b.redis_url(),
2036            redis_c.redis_url(),
2037        ];
2038        let adapter = new_test_adapter(redis_urls, lease_key);
2039
2040        // when
2041        let release_result = adapter.release().await;
2042
2043        // then
2044        assert!(
2045            release_result.is_ok(),
2046            "Release should be idempotent for adapters that do not own quorum lease"
2047        );
2048    }
2049
2050    #[tokio::test(flavor = "multi_thread")]
2051    async fn drop__when_non_last_clone_is_dropped_then_does_not_release_shared_lease() {
2052        // given
2053        let redis_a = RedisTestServer::spawn();
2054        let redis_b = RedisTestServer::spawn();
2055        let redis_c = RedisTestServer::spawn();
2056        let lease_key = "poa:test:drop-non-last-clone".to_string();
2057        let redis_urls = vec![
2058            redis_a.redis_url(),
2059            redis_b.redis_url(),
2060            redis_c.redis_url(),
2061        ];
2062        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
2063        assert!(
2064            adapter
2065                .acquire_lease_if_free()
2066                .await
2067                .expect("acquire should succeed"),
2068            "Adapter should acquire lease"
2069        );
2070        let adapter_clone = adapter.clone();
2071        let owner_token = adapter.lease_owner_token.clone();
2072
2073        // when
2074        drop(adapter_clone);
2075        sleep(Duration::from_millis(50)).await;
2076
2077        // then
2078        let owners = redis_urls
2079            .iter()
2080            .filter(|redis_url| {
2081                read_lease_owner(redis_url, &lease_key).as_deref()
2082                    == Some(owner_token.as_str())
2083            })
2084            .count();
2085        assert!(
2086            owners >= 2,
2087            "Dropping a non-last clone must not release quorum lease ownership"
2088        );
2089    }
2090
2091    #[tokio::test(flavor = "multi_thread")]
2092    async fn leader_state__when_lease_is_free_then_acquires_quorum_ownership() {
2093        // given
2094        let redis_a = RedisTestServer::spawn();
2095        let redis_b = RedisTestServer::spawn();
2096        let redis_c = RedisTestServer::spawn();
2097        let lease_key = "poa:test:acquire-on-leader-state".to_string();
2098        let redis_urls = vec![
2099            redis_a.redis_url(),
2100            redis_b.redis_url(),
2101            redis_c.redis_url(),
2102        ];
2103        let adapter = RedisLeaderLeaseAdapter::new(
2104            redis_urls.clone(),
2105            lease_key.clone(),
2106            Duration::from_millis(500),
2107            Duration::from_millis(100),
2108            Duration::from_millis(50),
2109            Duration::from_millis(0),
2110            1,
2111            1000,
2112        )
2113        .expect("adapter should be created");
2114
2115        // when
2116        let state = adapter
2117            .leader_state(1.into())
2118            .await
2119            .expect("leader_state should succeed");
2120        let owners = redis_urls
2121            .iter()
2122            .filter(|redis_url| {
2123                read_lease_owner(redis_url, &lease_key).as_deref()
2124                    == Some(adapter.lease_owner_token.as_str())
2125            })
2126            .count();
2127
2128        // then
2129        assert!(
2130            matches!(state, LeaderState::ReconciledLeader),
2131            "leader_state should acquire and report leader ownership when lease is free"
2132        );
2133        assert!(
2134            owners >= 2,
2135            "Lease ownership should be present on quorum after acquisition"
2136        );
2137    }
2138
2139    #[tokio::test(flavor = "multi_thread")]
2140    async fn leader_state__when_lease_expires_then_another_adapter_becomes_leader() {
2141        // given
2142        let redis_a = RedisTestServer::spawn();
2143        let redis_b = RedisTestServer::spawn();
2144        let redis_c = RedisTestServer::spawn();
2145        let lease_key = "poa:test:ttl-expiry-handoff".to_string();
2146        let redis_urls = vec![
2147            redis_a.redis_url(),
2148            redis_b.redis_url(),
2149            redis_c.redis_url(),
2150        ];
2151        let first_adapter = RedisLeaderLeaseAdapter::new(
2152            redis_urls.clone(),
2153            lease_key.clone(),
2154            Duration::from_millis(300),
2155            Duration::from_millis(100),
2156            Duration::from_millis(50),
2157            Duration::from_millis(0),
2158            1,
2159            1000,
2160        )
2161        .expect("first adapter should be created");
2162        let second_adapter = RedisLeaderLeaseAdapter::new(
2163            redis_urls.clone(),
2164            lease_key.clone(),
2165            Duration::from_millis(300),
2166            Duration::from_millis(100),
2167            Duration::from_millis(50),
2168            Duration::from_millis(0),
2169            1,
2170            1000,
2171        )
2172        .expect("second adapter should be created");
2173
2174        let first_state = first_adapter
2175            .leader_state(1.into())
2176            .await
2177            .expect("first leader_state should succeed");
2178        sleep(Duration::from_millis(900)).await;
2179
2180        // when
2181        let second_state = second_adapter
2182            .leader_state(1.into())
2183            .await
2184            .expect("second leader_state should succeed");
2185        let second_owner_count = redis_urls
2186            .iter()
2187            .filter(|redis_url| {
2188                read_lease_owner(redis_url, &lease_key).as_deref()
2189                    == Some(second_adapter.lease_owner_token.as_str())
2190            })
2191            .count();
2192
2193        // then
2194        assert!(
2195            matches!(first_state, LeaderState::ReconciledLeader),
2196            "First adapter should acquire lease initially"
2197        );
2198        assert!(
2199            matches!(second_state, LeaderState::ReconciledLeader),
2200            "Second adapter should become leader after TTL expiry"
2201        );
2202        assert!(
2203            second_owner_count >= 2,
2204            "Second adapter should own lease on quorum nodes after takeover"
2205        );
2206    }
2207
2208    #[tokio::test(flavor = "multi_thread")]
2209    async fn publish_produced_block__when_previous_leader_writes_after_handoff_then_rejects_zombie_write()
2210     {
2211        // given
2212        let redis_a = RedisTestServer::spawn();
2213        let redis_b = RedisTestServer::spawn();
2214        let redis_c = RedisTestServer::spawn();
2215        let lease_key = "poa:test:zombie-leader".to_string();
2216        let redis_urls = vec![
2217            redis_a.redis_url(),
2218            redis_b.redis_url(),
2219            redis_c.redis_url(),
2220        ];
2221        let old_leader = new_test_adapter(redis_urls.clone(), lease_key.clone());
2222        let current_leader = new_test_adapter(redis_urls, lease_key.clone());
2223        let block = poa_block_at_time(1, 111);
2224
2225        assert!(
2226            old_leader
2227                .acquire_lease_if_free()
2228                .await
2229                .expect("acquire should succeed"),
2230            "Old leader should acquire initial lease"
2231        );
2232        clear_lease_on_nodes(
2233            &[
2234                redis_a.redis_url(),
2235                redis_b.redis_url(),
2236                redis_c.redis_url(),
2237            ],
2238            &lease_key,
2239        );
2240        assert!(
2241            current_leader
2242                .acquire_lease_if_free()
2243                .await
2244                .expect("acquire should succeed"),
2245            "Current leader should acquire lease after handoff"
2246        );
2247
2248        // when
2249        let zombie_write = old_leader.publish_produced_block(&block);
2250
2251        // then
2252        assert!(
2253            zombie_write.is_err(),
2254            "Old leader write should be fenced after handoff"
2255        );
2256        let current_state = current_leader
2257            .leader_state(1.into())
2258            .await
2259            .expect("leader_state should succeed");
2260        assert!(
2261            matches!(current_state, LeaderState::ReconciledLeader),
2262            "Zombie partial writes must not be considered committed"
2263        );
2264    }
2265
2266    #[tokio::test(flavor = "multi_thread")]
2267    async fn publish_produced_block__when_epoch_is_behind_on_one_node_then_first_write_heals_epoch()
2268     {
2269        // given
2270        let redis_a = RedisTestServer::spawn();
2271        let redis_b = RedisTestServer::spawn();
2272        let redis_c = RedisTestServer::spawn();
2273        let lease_key = "poa:test:epoch-healing".to_string();
2274        let redis_urls = vec![
2275            redis_a.redis_url(),
2276            redis_b.redis_url(),
2277            redis_c.redis_url(),
2278        ];
2279        let adapter = new_test_adapter(redis_urls, lease_key.clone());
2280        let epoch_key = format!("{lease_key}:epoch:token");
2281        let block = poa_block_at_time(1, 222);
2282        assert!(
2283            adapter
2284                .acquire_lease_if_free()
2285                .await
2286                .expect("acquire should succeed"),
2287            "Adapter should acquire lease"
2288        );
2289        let leader_epoch = (*adapter.current_epoch_token.lock().expect("poisoned lock"))
2290            .expect("epoch should be initialized");
2291        let stale_epoch = leader_epoch.saturating_sub(1);
2292        set_epoch(&redis_a.redis_url(), &epoch_key, stale_epoch);
2293
2294        // when
2295        let publish_result = adapter.publish_produced_block(&block);
2296
2297        // then
2298        assert!(
2299            publish_result.is_ok(),
2300            "Publish should still succeed on quorum"
2301        );
2302        let healed_epoch = read_epoch(&redis_a.redis_url(), &epoch_key);
2303        assert_eq!(
2304            healed_epoch, leader_epoch,
2305            "First successful write should heal lagging epoch"
2306        );
2307    }
2308
2309    #[tokio::test(flavor = "multi_thread")]
2310    async fn publish_produced_block__when_write_succeeds_then_extends_lease_ttl() {
2311        // given
2312        let redis_a = RedisTestServer::spawn();
2313        let redis_b = RedisTestServer::spawn();
2314        let redis_c = RedisTestServer::spawn();
2315        let lease_key = "poa:test:publish-extends-lease-ttl".to_string();
2316        let redis_urls = vec![
2317            redis_a.redis_url(),
2318            redis_b.redis_url(),
2319            redis_c.redis_url(),
2320        ];
2321        let adapter = RedisLeaderLeaseAdapter::new(
2322            redis_urls.clone(),
2323            lease_key.clone(),
2324            Duration::from_millis(700),
2325            Duration::from_millis(100),
2326            Duration::from_millis(50),
2327            Duration::from_millis(0),
2328            1,
2329            1000,
2330        )
2331        .expect("adapter should be created");
2332        let block = poa_block_at_time(1, 444);
2333        assert!(
2334            adapter
2335                .acquire_lease_if_free()
2336                .await
2337                .expect("acquire should succeed"),
2338            "Adapter should acquire lease"
2339        );
2340        sleep(Duration::from_millis(500)).await;
2341
2342        // when
2343        let publish_result = adapter.publish_produced_block(&block);
2344        sleep(Duration::from_millis(400)).await;
2345        let owners = redis_urls
2346            .iter()
2347            .filter(|redis_url| {
2348                read_lease_owner(redis_url, &lease_key).as_deref()
2349                    == Some(adapter.lease_owner_token.as_str())
2350            })
2351            .count();
2352
2353        // then
2354        assert!(publish_result.is_ok(), "Publish should succeed on quorum");
2355        assert!(
2356            owners >= 2,
2357            "Successful write should extend lease TTL on quorum beyond original window"
2358        );
2359    }
2360
2361    #[tokio::test(flavor = "multi_thread")]
2362    async fn publish_produced_block__when_write_succeeds_on_less_than_quorum_then_entry_is_not_reconciled()
2363     {
2364        // given
2365        let redis_a = RedisTestServer::spawn();
2366        let redis_b = RedisTestServer::spawn();
2367        let redis_c = RedisTestServer::spawn();
2368        let lease_key = "poa:test:partial-write".to_string();
2369        let stream_key = format!("{lease_key}:block:stream");
2370        let redis_urls = vec![
2371            redis_a.redis_url(),
2372            redis_b.redis_url(),
2373            redis_c.redis_url(),
2374        ];
2375        let adapter = new_test_adapter(redis_urls, lease_key.clone());
2376        let block = poa_block_at_time(1, 333);
2377        assert!(
2378            adapter
2379                .acquire_lease_if_free()
2380                .await
2381                .expect("acquire should succeed"),
2382            "Adapter should acquire lease"
2383        );
2384        set_lease_owner(
2385            &redis_b.redis_url(),
2386            &lease_key,
2387            "other-owner",
2388            adapter.lease_ttl_millis,
2389        );
2390        set_lease_owner(
2391            &redis_c.redis_url(),
2392            &lease_key,
2393            "other-owner",
2394            adapter.lease_ttl_millis,
2395        );
2396
2397        // when
2398        let publish_result = adapter.publish_produced_block(&block);
2399        let unreconciled = adapter.unreconciled_blocks(1.into()).await;
2400
2401        // then
2402        assert!(
2403            publish_result.is_err(),
2404            "Publish must fail when fewer than quorum nodes accept write"
2405        );
2406        assert!(
2407            unreconciled.is_err(),
2408            "Unresolved backlog should return an error instead of empty result"
2409        );
2410        assert_eq!(
2411            stream_len(&redis_a.redis_url(), &stream_key),
2412            1,
2413            "One orphan entry should exist on the single successful node"
2414        );
2415    }
2416
2417    #[tokio::test(flavor = "multi_thread")]
2418    async fn unreconciled_blocks__when_quorum_latest_height_is_below_next_height_then_returns_empty()
2419     {
2420        // given
2421        let redis = RedisTestServer::spawn();
2422        let lease_key = "poa:test:cursor-fast-path".to_string();
2423        let stream_key = format!("{lease_key}:block:stream");
2424        let adapter = RedisLeaderLeaseAdapter::new(
2425            vec![redis.redis_url()],
2426            lease_key,
2427            Duration::from_secs(2),
2428            Duration::from_millis(100),
2429            Duration::from_millis(50),
2430            Duration::from_millis(0),
2431            1,
2432            1000,
2433        )
2434        .expect("adapter should be created");
2435        let block = poa_block_at_time(1, 10);
2436        let block_data = postcard::to_allocvec(&block).expect("serialize block");
2437        let redis_client =
2438            redis::Client::open(redis.redis_url()).expect("redis client should open");
2439        let mut conn = redis_client
2440            .get_connection()
2441            .expect("redis connection should open");
2442        append_stream_block(&mut conn, &stream_key, 1, &block_data, 1);
2443
2444        // when
2445        let blocks = adapter
2446            .unreconciled_blocks(2.into())
2447            .await
2448            .expect("reconciliation read should succeed");
2449
2450        // then
2451        assert!(
2452            blocks.is_empty(),
2453            "Expected fast path to skip full reconciliation"
2454        );
2455    }
2456
2457    #[tokio::test(flavor = "multi_thread")]
2458    async fn read_stream_entries_on_node__when_next_height_is_provided_then_reads_matching_entries()
2459     {
2460        // given
2461        let redis = RedisTestServer::spawn();
2462        let lease_key = "poa:test:cursor-incremental-read".to_string();
2463        let stream_key = format!("{lease_key}:block:stream");
2464        let adapter = RedisLeaderLeaseAdapter::new(
2465            vec![redis.redis_url()],
2466            lease_key,
2467            Duration::from_secs(2),
2468            Duration::from_millis(100),
2469            Duration::from_millis(50),
2470            Duration::from_millis(0),
2471            1,
2472            1000,
2473        )
2474        .expect("adapter should be created");
2475        let redis_client =
2476            redis::Client::open(redis.redis_url()).expect("redis client should open");
2477        let mut conn = redis_client
2478            .get_connection()
2479            .expect("redis connection should open");
2480        let h1 = poa_block_at_time(1, 10);
2481        let h2 = poa_block_at_time(2, 20);
2482        let h3 = poa_block_at_time(3, 30);
2483        let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2484        let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2485        let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2486        append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2487        append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2488        let redis_node = adapter.redis_nodes[0].clone();
2489
2490        // when
2491        let first_read = adapter
2492            .read_stream_entries_on_node(&redis_node, 1, 1000)
2493            .await
2494            .expect("first read should succeed");
2495        append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2496        let second_read = adapter
2497            .read_stream_entries_on_node(&redis_node, 3, 1000)
2498            .await
2499            .expect("second read should succeed");
2500
2501        // then
2502        assert_eq!(
2503            first_read.len(),
2504            2,
2505            "Expected initial read to include existing entries"
2506        );
2507        assert_eq!(
2508            second_read.len(),
2509            1,
2510            "Expected height-filtered read to include only matching entries"
2511        );
2512        assert_eq!(
2513            u32::from(*second_read[0].2.entity.header().height()),
2514            3,
2515            "Expected only the requested next height"
2516        );
2517    }
2518
2519    #[tokio::test(flavor = "multi_thread")]
2520    async fn read_stream_entries_on_node__when_max_entries_is_small_then_caps_results() {
2521        // given
2522        let redis = RedisTestServer::spawn();
2523        let lease_key = "poa:test:cursor-pagination".to_string();
2524        let stream_key = format!("{lease_key}:block:stream");
2525        let adapter = RedisLeaderLeaseAdapter::new(
2526            vec![redis.redis_url()],
2527            lease_key,
2528            Duration::from_secs(2),
2529            Duration::from_millis(100),
2530            Duration::from_millis(50),
2531            Duration::from_millis(0),
2532            1,
2533            1000,
2534        )
2535        .expect("adapter should be created");
2536        let redis_client =
2537            redis::Client::open(redis.redis_url()).expect("redis client should open");
2538        let mut conn = redis_client
2539            .get_connection()
2540            .expect("redis connection should open");
2541        let h1 = poa_block_at_time(1, 10);
2542        let h2 = poa_block_at_time(2, 20);
2543        let h3 = poa_block_at_time(3, 30);
2544        let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2545        let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2546        let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2547        append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2548        append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2549        append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2550        let redis_node = adapter.redis_nodes[0].clone();
2551
2552        // when
2553        let first_page = adapter
2554            .read_stream_entries_on_node(&redis_node, 1, 2)
2555            .await
2556            .expect("first page should succeed");
2557        let second_page = adapter
2558            .read_stream_entries_on_node(&redis_node, 3, 2)
2559            .await
2560            .expect("second page should succeed");
2561
2562        // then
2563        assert_eq!(first_page.len(), 2, "Expected first page to be capped");
2564        assert_eq!(
2565            u32::from(*first_page[0].2.entity.header().height()),
2566            1,
2567            "Expected first page to start from earliest height"
2568        );
2569        assert_eq!(
2570            u32::from(*first_page[1].2.entity.header().height()),
2571            2,
2572            "Expected first page to include second height"
2573        );
2574        assert_eq!(
2575            second_page.len(),
2576            1,
2577            "Expected height filter to return only matching trailing entry"
2578        );
2579        assert_eq!(
2580            u32::from(*second_page[0].2.entity.header().height()),
2581            3,
2582            "Expected second read to include only the requested next height"
2583        );
2584    }
2585
2586    /// When a partial publish leaves a stale entry at a given height,
2587    /// a subsequent write at the same height is rejected by
2588    /// write_block.lua's HEIGHT_EXISTS check. This prevents two blocks
2589    /// at the same height from coexisting in the stream, which would
2590    /// cause a fork if a different leader also achieved quorum at that
2591    /// height.
2592    #[tokio::test(flavor = "multi_thread")]
2593    async fn partial_publish_then_retry_at_same_height__new_leader_reconciles_stale_block()
2594     {
2595        let redis = RedisTestServer::spawn();
2596        let lease_key = "poa:test:fork-repro".to_string();
2597        let stream_key = format!("{lease_key}:block:stream");
2598
2599        let adapter_a = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2600        assert!(
2601            adapter_a
2602                .acquire_lease_if_free()
2603                .await
2604                .expect("acquire should succeed"),
2605            "adapter_a should acquire lease"
2606        );
2607        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2608            .expect("epoch should be set");
2609
2610        // Simulate stale partial publish by writing directly to stream
2611        let block_a = poa_block_at_time(1, 100);
2612        let block_a_data = postcard::to_allocvec(&block_a).expect("serialize block_a");
2613        let redis_client =
2614            redis::Client::open(redis.redis_url()).expect("redis client should open");
2615        let mut conn = redis_client
2616            .get_connection()
2617            .expect("redis connection should open");
2618        append_stream_block(&mut conn, &stream_key, 1, &block_a_data, epoch_a as u32);
2619
2620        // Leader A retries with a different block at the same height —
2621        // this should FAIL because height 1 already exists in the stream.
2622        let block_b = poa_block_at_time(1, 999);
2623        assert_ne!(
2624            block_a.entity.header().time(),
2625            block_b.entity.header().time()
2626        );
2627
2628        let result = adapter_a.publish_produced_block(&block_b);
2629        assert!(
2630            result.is_err(),
2631            "publish at same height should fail due to HEIGHT_EXISTS"
2632        );
2633
2634        // Stream should still have only the original entry
2635        assert_eq!(stream_len(&redis.redis_url(), &stream_key), 1);
2636
2637        adapter_a.release().await.expect("release should succeed");
2638
2639        // A new leader reconciles and sees block_a (the only entry)
2640        let adapter_b = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2641        assert!(
2642            adapter_b
2643                .acquire_lease_if_free()
2644                .await
2645                .expect("acquire should succeed"),
2646            "adapter_b should acquire lease"
2647        );
2648
2649        let unreconciled = adapter_b
2650            .unreconciled_blocks(1.into())
2651            .await
2652            .expect("reconciliation should succeed");
2653
2654        assert_eq!(unreconciled.len(), 1, "Should reconcile exactly one block");
2655        assert_eq!(
2656            unreconciled[0].entity.header().time(),
2657            block_a.entity.header().time(),
2658            "Reconciliation should return block_a (the only entry in the stream)"
2659        );
2660    }
2661
2662    // ---------------------------------------------------------------------
2663    // Regression tests for the 2026-04-22 mainnet hang.
2664    //
2665    // On v0.47.4 the deployed binary linked `redis 0.27.x`. That release of
2666    // `redis::Client::get_connection_with_timeout` only applied the timeout
2667    // to the TCP connect step; the post-connect handshake pipeline
2668    // (`CLIENT SETINFO LIB-NAME`, `CLIENT SETINFO LIB-VER`) ran without any
2669    // socket-level read/write timeout. A peer that accepted TCP but stopped
2670    // responding caused the call to hang indefinitely. Combined with the
2671    // previous `std::thread::scope`-based `publish_block_on_all_nodes`, one
2672    // stuck per-node thread wedged every block publish forever and halted
2673    // block production on mainnet for ~22 minutes.
2674    //
2675    // The fix in this PR has two parts:
2676    //   1. Bump `redis` to 1.2. Upstream's `connect()` now sets read/write
2677    //      timeouts on the socket BEFORE running the handshake pipeline and
2678    //      clears them afterward.
2679    //   2. Replace `std::thread::scope` in `publish_block_on_all_nodes`
2680    //      with detached `std::thread::spawn` workers reporting into an
2681    //      mpsc channel, returning as soon as `Written` quorum is reached.
2682    //      Stragglers are abandoned.
2683    //
2684    // The first test below targets (1) directly: raw library call against a
2685    // half-alive peer must now be bounded.
2686    //
2687    // The second test targets (2) at the adapter level: with 3 healthy
2688    // redis-server processes + 1 half-alive TCP listener (4 nodes, quorum =
2689    // 3), `publish_produced_block` must complete within a tight wall-clock
2690    // deadline. On pre-fix code this call hangs forever because the scoped
2691    // thread for the half-alive node never exits; on the fixed code the
2692    // publish returns as soon as the 3 healthy nodes acknowledge.
2693    // ---------------------------------------------------------------------
2694
2695    /// Accepts TCP connections and drains incoming bytes into a buffer
2696    /// without ever writing a single byte back. Emulates an ElastiCache /
2697    /// Valkey node in the "half-alive" recovery state observed on 2026-04-22.
2698    fn spawn_halflive_redis_listener() -> u16 {
2699        let listener =
2700            TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2701                .expect("bind ephemeral port");
2702        let port = listener.local_addr().unwrap().port();
2703        thread::spawn(move || {
2704            for incoming in listener.incoming() {
2705                let Ok(mut stream) = incoming else {
2706                    continue;
2707                };
2708                thread::spawn(move || {
2709                    let mut buf = [0u8; 4096];
2710                    while let Ok(n) = stream.read(&mut buf) {
2711                        if n == 0 {
2712                            return;
2713                        }
2714                    }
2715                });
2716            }
2717        });
2718        port
2719    }
2720
2721    /// Library-level regression guard for the upstream `redis 0.27.x` bug.
2722    /// On that version this test would hang indefinitely and the process
2723    /// would need to be killed. On `redis >= 1.2` the call returns an `Err`
2724    /// within roughly `2 * node_timeout` (one timeout per handshake
2725    /// pipeline command: `CLIENT SETINFO LIB-NAME`, `CLIENT SETINFO LIB-VER`).
2726    #[test]
2727    fn redis_get_connection_with_timeout__is_bounded_against_halflive_peer() {
2728        const NODE_TIMEOUT: Duration = Duration::from_secs(1);
2729        const MAX_WAIT: Duration = Duration::from_secs(5);
2730
2731        let port = spawn_halflive_redis_listener();
2732        let url = format!("redis://127.0.0.1:{port}/");
2733        let client = redis::Client::open(url).expect("client open");
2734
2735        let (tx, rx) = mpsc::channel();
2736        thread::spawn(move || {
2737            let start = Instant::now();
2738            let result = client.get_connection_with_timeout(NODE_TIMEOUT);
2739            let _ = tx.send((start.elapsed(), result.is_ok()));
2740        });
2741
2742        match rx.recv_timeout(MAX_WAIT) {
2743            Ok((elapsed, ok)) => {
2744                assert!(
2745                    !ok,
2746                    "expected err against a half-alive peer, got Ok after \
2747                     {elapsed:?}",
2748                );
2749                assert!(
2750                    elapsed <= MAX_WAIT,
2751                    "call took {elapsed:?}, expected to be bounded under \
2752                     {MAX_WAIT:?}",
2753                );
2754            }
2755            Err(_) => {
2756                panic!(
2757                    "REGRESSION: get_connection_with_timeout did not \
2758                     return within {MAX_WAIT:?} against a half-alive \
2759                     peer. The redis-crate upstream bug from 0.27.x has \
2760                     reappeared.",
2761                );
2762            }
2763        }
2764    }
2765
2766    /// Adapter-level regression test for the block-production hang. With 3
2767    /// healthy redis nodes and 1 half-alive TCP listener (4 nodes total,
2768    /// quorum = 3), `publish_produced_block` must complete within a
2769    /// wall-clock deadline much smaller than "forever".
2770    ///
2771    /// On the pre-fix code (`std::thread::scope` waiting for all 4 handles
2772    /// to join) this call hangs until the process is killed. On the fixed
2773    /// code the detached threads report via an mpsc channel and
2774    /// `publish_block_on_all_nodes` returns as soon as the 3 healthy nodes
2775    /// acknowledge `Written`.
2776    #[tokio::test(flavor = "multi_thread")]
2777    async fn publish_produced_block__returns_within_bound_when_one_node_is_half_alive() {
2778        // given
2779        let redis_a = RedisTestServer::spawn();
2780        let redis_b = RedisTestServer::spawn();
2781        let redis_c = RedisTestServer::spawn();
2782        let halflive_port = spawn_halflive_redis_listener();
2783        let lease_key = "poa:test:halflive-publish".to_string();
2784        let urls = vec![
2785            redis_a.redis_url(),
2786            redis_b.redis_url(),
2787            redis_c.redis_url(),
2788            format!("redis://127.0.0.1:{halflive_port}/"),
2789        ];
2790        let adapter = new_test_adapter(urls, lease_key);
2791        assert!(
2792            adapter
2793                .acquire_lease_if_free()
2794                .await
2795                .expect("acquire should succeed"),
2796            "should acquire quorum from 3 healthy nodes out of 4",
2797        );
2798
2799        // when — publish is sync; offload to a blocking thread so we can
2800        // enforce a wall-clock deadline via tokio::time::timeout. Without
2801        // the fix this task never completes.
2802        let block = poa_block_at_time(1, 111);
2803        let publish_adapter = adapter.clone();
2804        let start = Instant::now();
2805        let publish = tokio::task::spawn_blocking(move || {
2806            publish_adapter.publish_produced_block(&block)
2807        });
2808        let deadline = Duration::from_secs(5);
2809        let result = tokio::time::timeout(deadline, publish)
2810            .await
2811            .expect(
2812                "publish_produced_block must return within the deadline — \
2813                 on the pre-fix code it hangs forever against a half-alive \
2814                 peer",
2815            )
2816            .expect("spawn_blocking should not panic");
2817        let elapsed = start.elapsed();
2818
2819        // then
2820        result.expect("publish should succeed: 3/4 healthy nodes is quorum");
2821        assert!(
2822            elapsed < deadline,
2823            "publish took {elapsed:?}, expected well under {deadline:?}",
2824        );
2825    }
2826
2827    struct RedisTestServer {
2828        child: Option<Child>,
2829        port: u16,
2830        redis_url: String,
2831    }
2832
2833    impl RedisTestServer {
2834        fn spawn() -> Self {
2835            let mut server = Self::new_stopped();
2836            server.start();
2837            server
2838        }
2839
2840        fn new_stopped() -> Self {
2841            let port = bind_unused_port();
2842            Self {
2843                child: None,
2844                port,
2845                redis_url: format!("redis://127.0.0.1:{port}/"),
2846            }
2847        }
2848
2849        fn start(&mut self) {
2850            if self.child.is_some() {
2851                return;
2852            }
2853            let child = spawn_redis_server(self.port);
2854            wait_for_redis_ready(self.port);
2855            self.child = Some(child);
2856        }
2857
2858        fn stop(&mut self) {
2859            if let Some(child) = self.child.as_mut() {
2860                let _ = child.kill();
2861                let _ = child.wait();
2862            }
2863            self.child = None;
2864        }
2865
2866        fn redis_url(&self) -> String {
2867            self.redis_url.clone()
2868        }
2869    }
2870
2871    impl Drop for RedisTestServer {
2872        fn drop(&mut self) {
2873            if let Some(child) = self.child.as_mut() {
2874                let _ = child.kill();
2875                let _ = child.wait();
2876            }
2877        }
2878    }
2879
2880    fn bind_unused_port() -> u16 {
2881        let socket =
2882            TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2883                .expect("Should bind an ephemeral port");
2884        let port = socket.local_addr().expect("Should get local addr").port();
2885        drop(socket);
2886        port
2887    }
2888
2889    fn spawn_redis_server(port: u16) -> Child {
2890        Command::new("redis-server")
2891            .arg("--port")
2892            .arg(port.to_string())
2893            .arg("--save")
2894            .arg("")
2895            .arg("--appendonly")
2896            .arg("no")
2897            .arg("--bind")
2898            .arg("127.0.0.1")
2899            .stdout(Stdio::null())
2900            .stderr(Stdio::null())
2901            .spawn()
2902            .expect("Failed to spawn redis-server")
2903    }
2904
2905    fn wait_for_redis_ready(port: u16) {
2906        let addr = SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, port);
2907        let start = std::time::Instant::now();
2908        let timeout = Duration::from_secs(5);
2909        while start.elapsed() < timeout {
2910            if TcpStream::connect(addr).is_ok() {
2911                return;
2912            }
2913            thread::sleep(Duration::from_millis(10));
2914        }
2915        panic!("Redis server did not become ready on port {port}");
2916    }
2917
2918    fn poa_block_at_time(height: u32, timestamp: u64) -> SealedBlock {
2919        let mut block = Block::default();
2920        block.header_mut().set_block_height(height.into());
2921        block
2922            .header_mut()
2923            .set_time(fuel_core_types::tai64::Tai64(timestamp));
2924        block.header_mut().recalculate_metadata();
2925        SealedBlock {
2926            entity: block,
2927            consensus: Consensus::PoA(Default::default()),
2928        }
2929    }
2930
2931    fn append_stream_block(
2932        conn: &mut redis::Connection,
2933        stream_key: &str,
2934        height: u32,
2935        data: &[u8],
2936        epoch: u32,
2937    ) {
2938        let _: String = redis::cmd("XADD")
2939            .arg(stream_key)
2940            .arg("*")
2941            .arg("height")
2942            .arg(height)
2943            .arg("data")
2944            .arg(data)
2945            .arg("epoch")
2946            .arg(epoch)
2947            .arg("timestamp")
2948            .arg(epoch)
2949            .query(conn)
2950            .expect("stream write should succeed");
2951    }
2952
2953    fn new_test_adapter(
2954        redis_urls: Vec<String>,
2955        lease_key: String,
2956    ) -> RedisLeaderLeaseAdapter {
2957        RedisLeaderLeaseAdapter::new(
2958            redis_urls,
2959            lease_key,
2960            Duration::from_secs(2),
2961            Duration::from_millis(100),
2962            Duration::from_millis(50),
2963            Duration::from_millis(0),
2964            1,
2965            1000,
2966        )
2967        .expect("adapter should be created")
2968    }
2969
2970    fn set_epoch(redis_url: &str, epoch_key: &str, epoch: u64) {
2971        let redis_client =
2972            redis::Client::open(redis_url).expect("redis client should open");
2973        let mut conn = redis_client
2974            .get_connection()
2975            .expect("redis connection should open");
2976        let _: () = redis::cmd("SET")
2977            .arg(epoch_key)
2978            .arg(epoch)
2979            .query(&mut conn)
2980            .expect("epoch set should succeed");
2981    }
2982
2983    fn read_epoch(redis_url: &str, epoch_key: &str) -> u64 {
2984        let redis_client =
2985            redis::Client::open(redis_url).expect("redis client should open");
2986        let mut conn = redis_client
2987            .get_connection()
2988            .expect("redis connection should open");
2989        let epoch: Option<u64> = redis::cmd("GET")
2990            .arg(epoch_key)
2991            .query(&mut conn)
2992            .expect("epoch get should succeed");
2993        epoch.expect("epoch should exist")
2994    }
2995
2996    fn set_lease_owner(
2997        redis_url: &str,
2998        lease_key: &str,
2999        owner: &str,
3000        lease_ttl_millis: u64,
3001    ) {
3002        let redis_client =
3003            redis::Client::open(redis_url).expect("redis client should open");
3004        let mut conn = redis_client
3005            .get_connection()
3006            .expect("redis connection should open");
3007        let _: () = redis::cmd("SET")
3008            .arg(lease_key)
3009            .arg(owner)
3010            .arg("PX")
3011            .arg(lease_ttl_millis)
3012            .query(&mut conn)
3013            .expect("lease owner set should succeed");
3014    }
3015
3016    fn read_lease_owner(redis_url: &str, lease_key: &str) -> Option<String> {
3017        let redis_client =
3018            redis::Client::open(redis_url).expect("redis client should open");
3019        let mut conn = redis_client
3020            .get_connection()
3021            .expect("redis connection should open");
3022        redis::cmd("GET")
3023            .arg(lease_key)
3024            .query(&mut conn)
3025            .expect("lease owner get should succeed")
3026    }
3027
3028    fn clear_lease_on_nodes(redis_urls: &[String], lease_key: &str) {
3029        redis_urls.iter().for_each(|redis_url| {
3030            let redis_client = redis::Client::open(redis_url.as_str())
3031                .expect("redis client should open");
3032            let mut conn = redis_client
3033                .get_connection()
3034                .expect("redis connection should open");
3035            let _: () = redis::cmd("DEL")
3036                .arg(lease_key)
3037                .query(&mut conn)
3038                .expect("lease delete should succeed");
3039        });
3040    }
3041
3042    fn stream_len(redis_url: &str, stream_key: &str) -> usize {
3043        let redis_client =
3044            redis::Client::open(redis_url).expect("redis client should open");
3045        let mut conn = redis_client
3046            .get_connection()
3047            .expect("redis connection should open");
3048        redis::cmd("XLEN")
3049            .arg(stream_key)
3050            .query(&mut conn)
3051            .expect("stream length query should succeed")
3052    }
3053
3054    /// When Redis read calls fail on a quorum of nodes,
3055    /// `unreconciled_blocks` must return an error — not silently
3056    /// return an empty list that would let the caller produce
3057    /// a divergent block.
3058    #[tokio::test(flavor = "multi_thread")]
3059    async fn unreconciled_blocks__when_reads_fail_on_quorum_nodes__returns_error() {
3060        // given — 3 Redis nodes, leader A publishes block to all 3
3061        let mut redis_a = RedisTestServer::spawn();
3062        let mut redis_b = RedisTestServer::spawn();
3063        let redis_c = RedisTestServer::spawn();
3064        let lease_key = "poa:test:read-failure-fork".to_string();
3065        let redis_urls = vec![
3066            redis_a.redis_url(),
3067            redis_b.redis_url(),
3068            redis_c.redis_url(),
3069        ];
3070
3071        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3072        assert!(
3073            adapter_a
3074                .acquire_lease_if_free()
3075                .await
3076                .expect("acquire should succeed"),
3077            "Leader A should acquire lease"
3078        );
3079
3080        let block = poa_block_at_time(1, 100);
3081        adapter_a
3082            .publish_produced_block(&block)
3083            .expect("publish should succeed on all 3 nodes");
3084
3085        // Verify block exists on all 3 nodes
3086        let stream_key = format!("{lease_key}:block:stream");
3087        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3088        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3089        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
3090
3091        // Simulate A releasing lease
3092        adapter_a.release().await.expect("release should succeed");
3093
3094        // when — kill 2 of 3 Redis nodes BEFORE new leader reconciles
3095        redis_a.stop();
3096        redis_b.stop();
3097
3098        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3099        // Manually set epoch so we can call unreconciled_blocks directly
3100        {
3101            let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3102            *epoch = Some(99);
3103        }
3104
3105        let result = adapter_b.unreconciled_blocks(1.into()).await;
3106
3107        // then — must return Err, not Ok([])
3108        assert!(
3109            result.is_err(),
3110            "unreconciled_blocks must return error when reads fail on quorum of nodes"
3111        );
3112    }
3113
3114    /// Proves that when a Redis node restarts (losing all in-memory data),
3115    /// a block that was published to exactly quorum nodes drops below quorum
3116    /// and reconciliation cannot find it — enabling a fork.
3117    #[tokio::test(flavor = "multi_thread")]
3118    async fn unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum()
3119     {
3120        // given — 3 Redis nodes, leader A publishes block to nodes A and B only
3121        // (simulating a partial publish where node C timed out)
3122        let redis_a = RedisTestServer::spawn();
3123        let mut redis_b = RedisTestServer::spawn();
3124        let redis_c = RedisTestServer::spawn();
3125        let lease_key = "poa:test:data-loss-fork".to_string();
3126        let stream_key = format!("{lease_key}:block:stream");
3127        let redis_urls = vec![
3128            redis_a.redis_url(),
3129            redis_b.redis_url(),
3130            redis_c.redis_url(),
3131        ];
3132
3133        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3134        assert!(
3135            adapter_a
3136                .acquire_lease_if_free()
3137                .await
3138                .expect("acquire should succeed"),
3139            "Leader A should acquire lease"
3140        );
3141        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
3142            .expect("epoch should be set");
3143
3144        // Publish block to nodes A and B only (simulating node C timeout).
3145        // We write directly to simulate the partial publish that still
3146        // reaches quorum (2/3).
3147        let block = poa_block_at_time(1, 100);
3148        let block_data = postcard::to_allocvec(&block).expect("serialize");
3149
3150        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3151        let mut conn_a = client_a.get_connection().expect("conn");
3152        let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3153        let mut conn_b = client_b.get_connection().expect("conn");
3154
3155        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, epoch_a as u32);
3156        append_stream_block(&mut conn_b, &stream_key, 1, &block_data, epoch_a as u32);
3157        // Node C has no entry (simulated timeout during publish)
3158
3159        // Verify: block on A and B, not on C
3160        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3161        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3162        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 0);
3163
3164        // Confirm reconciliation works BEFORE data loss — quorum=2, both A and B have it
3165        let pre_loss = adapter_a
3166            .unreconciled_blocks(1.into())
3167            .await
3168            .expect("reconciliation should succeed");
3169        assert_eq!(
3170            pre_loss.len(),
3171            1,
3172            "Block should be reconcilable with 2/3 nodes having it"
3173        );
3174
3175        // when — Redis node B restarts (pod eviction / rolling deploy / AMI drift)
3176        // All in-memory data is lost (no persistence configured)
3177        drop(conn_b);
3178        drop(client_b);
3179        redis_b.stop();
3180        redis_b.start();
3181
3182        // Verify node B lost its stream data
3183        assert_eq!(
3184            stream_len(&redis_b.redis_url(), &stream_key),
3185            0,
3186            "Restarted node should have empty stream"
3187        );
3188
3189        // Release A's lease so B can acquire
3190        adapter_a.release().await.expect("release should succeed");
3191
3192        // New leader acquires
3193        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3194        assert!(
3195            adapter_b
3196                .acquire_lease_if_free()
3197                .await
3198                .expect("acquire should succeed"),
3199            "New leader should acquire lease"
3200        );
3201
3202        let post_loss = adapter_b
3203            .unreconciled_blocks(1.into())
3204            .await
3205            .expect("reconciliation should succeed");
3206
3207        // then — repair reproposed the block from node A to node B (now empty)
3208        // and node C, reaching quorum again. The block is recovered.
3209        assert_eq!(
3210            post_loss.len(),
3211            1,
3212            "Repair should recover the block by reproposing from node A to the other nodes"
3213        );
3214    }
3215
3216    /// After an election storm where leader A wins on nodes 1,2 but
3217    /// another candidate held node 3, `has_lease_owner_quorum` should
3218    /// expand the lock to node 3 once it's free. Subsequent block
3219    /// writes then go to all 3 nodes instead of just 2.
3220    #[tokio::test(flavor = "multi_thread")]
3221    async fn has_lease_owner_quorum__expands_lock_to_non_owned_nodes() {
3222        // given — 3 Redis nodes
3223        let redis_a = RedisTestServer::spawn();
3224        let redis_b = RedisTestServer::spawn();
3225        let redis_c = RedisTestServer::spawn();
3226        let lease_key = "poa:test:lock-expansion".to_string();
3227        let stream_key = format!("{lease_key}:block:stream");
3228        let redis_urls = vec![
3229            redis_a.redis_url(),
3230            redis_b.redis_url(),
3231            redis_c.redis_url(),
3232        ];
3233
3234        // Simulate election storm: candidate B grabs node C first
3235        let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3236        // Manually acquire on node C only (simulate B winning SET NX on C)
3237        {
3238            let client = redis::Client::open(redis_c.redis_url()).expect("client");
3239            let mut conn = client.get_connection().expect("conn");
3240            let _: () = redis::cmd("SET")
3241                .arg(&lease_key)
3242                .arg(&candidate_b.lease_owner_token)
3243                .arg("PX")
3244                .arg(5000u64)
3245                .query(&mut conn)
3246                .expect("set should succeed");
3247        }
3248
3249        // Leader A acquires — gets nodes A,B but not C (B holds it)
3250        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3251        assert!(
3252            adapter_a
3253                .acquire_lease_if_free()
3254                .await
3255                .expect("acquire should succeed"),
3256            "Leader A should acquire quorum (2/3)"
3257        );
3258
3259        // Verify A owns nodes A,B but NOT node C
3260        let owns_a = read_lease_owner(&redis_a.redis_url(), &lease_key)
3261            == Some(adapter_a.lease_owner_token.clone());
3262        let owns_b = read_lease_owner(&redis_b.redis_url(), &lease_key)
3263            == Some(adapter_a.lease_owner_token.clone());
3264        let owns_c = read_lease_owner(&redis_c.redis_url(), &lease_key)
3265            == Some(adapter_a.lease_owner_token.clone());
3266        assert!(owns_a && owns_b, "A should own nodes A and B");
3267        assert!(!owns_c, "A should NOT own node C (held by B)");
3268
3269        // Candidate B releases node C (simulating failed-quorum cleanup)
3270        clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
3271        assert!(
3272            read_lease_owner(&redis_c.redis_url(), &lease_key).is_none(),
3273            "Node C should be free after B releases"
3274        );
3275
3276        // when — A calls has_lease_owner_quorum (which now expands)
3277        let has_quorum = adapter_a
3278            .has_lease_owner_quorum()
3279            .await
3280            .expect("quorum check should succeed");
3281        assert!(has_quorum, "A should still have quorum");
3282
3283        // then — A should now own node C too
3284        let owns_c_after = read_lease_owner(&redis_c.redis_url(), &lease_key)
3285            == Some(adapter_a.lease_owner_token.clone());
3286        assert!(owns_c_after, "Lock expansion should have acquired node C");
3287
3288        // Verify writes now go to all 3 nodes
3289        let block = poa_block_at_time(1, 100);
3290        adapter_a
3291            .publish_produced_block(&block)
3292            .expect("publish should succeed");
3293
3294        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3295        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3296        assert_eq!(
3297            stream_len(&redis_c.redis_url(), &stream_key),
3298            1,
3299            "Block should be written to expanded node C"
3300        );
3301    }
3302
3303    /// When lock expansion acquires a node with a higher epoch
3304    /// (from election storm drift), the leader adopts the higher epoch
3305    /// so write_block.lua succeeds on all nodes.
3306    #[tokio::test(flavor = "multi_thread")]
3307    async fn has_lease_owner_quorum__adopts_higher_epoch_from_expanded_node() {
3308        // given — 3 Redis nodes
3309        let redis_a = RedisTestServer::spawn();
3310        let redis_b = RedisTestServer::spawn();
3311        let redis_c = RedisTestServer::spawn();
3312        let lease_key = "poa:test:epoch-adoption".to_string();
3313        let epoch_key = format!("{lease_key}:epoch:token");
3314        let stream_key = format!("{lease_key}:block:stream");
3315        let redis_urls = vec![
3316            redis_a.redis_url(),
3317            redis_b.redis_url(),
3318            redis_c.redis_url(),
3319        ];
3320
3321        // Simulate election storm: B promotes on node C (incrementing epoch)
3322        // then fails quorum and releases the lock, leaving epoch drifted
3323        let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3324        {
3325            let client = redis::Client::open(redis_c.redis_url()).expect("client");
3326            let mut conn = client.get_connection().expect("conn");
3327            // Simulate B's promote_leader.lua on node C: SET NX + INCR
3328            let _: () = redis::cmd("SET")
3329                .arg(&lease_key)
3330                .arg(&candidate_b.lease_owner_token)
3331                .arg("PX")
3332                .arg(5000u64)
3333                .query(&mut conn)
3334                .expect("set should succeed");
3335            let _: u64 = redis::cmd("INCR")
3336                .arg(&epoch_key)
3337                .query(&mut conn)
3338                .expect("incr should succeed");
3339            // B releases (failed quorum cleanup)
3340        }
3341        clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
3342
3343        // Node C now has epoch=1 but no lock owner
3344        let epoch_c_before = read_epoch(&redis_c.redis_url(), &epoch_key);
3345
3346        // Leader A acquires on all free nodes (A,B,C all free now)
3347        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
3348        assert!(
3349            adapter_a
3350                .acquire_lease_if_free()
3351                .await
3352                .expect("acquire should succeed"),
3353        );
3354        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
3355            .expect("epoch should be set");
3356
3357        // A's epoch should be max across all 3 nodes
3358        // Node C had epoch=1 from B's INCR, then A's promote INCR'd it to 2
3359        // Nodes A,B were at 0, A's promote INCR'd them to 1
3360        // A takes max(1, 1, 2) = 2
3361        assert!(
3362            epoch_a > epoch_c_before,
3363            "Leader's epoch ({epoch_a}) should be > node C's pre-acquisition epoch ({epoch_c_before})"
3364        );
3365
3366        // Verify writes succeed on ALL nodes with the adopted epoch
3367        let block = poa_block_at_time(1, 100);
3368        adapter_a
3369            .publish_produced_block(&block)
3370            .expect("publish should succeed on all 3 nodes");
3371
3372        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
3373        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
3374        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
3375    }
3376
3377    /// Exercises promotion, block write, fencing rejection, repair, and
3378    /// reconciliation, then dumps `encode_metrics()` to verify all PoA
3379    /// metrics appear on the /v1/metrics endpoint with expected values.
3380    #[tokio::test(flavor = "multi_thread")]
3381    async fn metrics__poa_metrics_appear_in_encoded_output_after_exercising_all_paths() {
3382        // --- setup: 3 Redis nodes ---
3383        let redis_a = RedisTestServer::spawn();
3384        let redis_b = RedisTestServer::spawn();
3385        let redis_c = RedisTestServer::spawn();
3386        let lease_key = "poa:test:metrics-smoke".to_string();
3387        let stream_key = format!("{lease_key}:block:stream");
3388        let redis_urls = vec![
3389            redis_a.redis_url(),
3390            redis_b.redis_url(),
3391            redis_c.redis_url(),
3392        ];
3393
3394        // 1. Promotion (success path)
3395        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3396        assert!(
3397            adapter
3398                .acquire_lease_if_free()
3399                .await
3400                .expect("acquire should succeed"),
3401            "adapter should acquire lease"
3402        );
3403
3404        // 2. Successful block write
3405        let block1 = poa_block_at_time(1, 100);
3406        adapter
3407            .publish_produced_block(&block1)
3408            .expect("publish should succeed");
3409
3410        // 3. HEIGHT_EXISTS — write same height again
3411        let block1_dup = poa_block_at_time(1, 200);
3412        let dup_result = adapter.publish_produced_block(&block1_dup);
3413        assert!(dup_result.is_err(), "duplicate height should fail");
3414
3415        // 4. Fencing rejection — old leader tries to write after handoff
3416        let old_adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3417        // Give old_adapter a stale epoch so it thinks it's leader
3418        {
3419            let mut epoch = old_adapter.current_epoch_token.lock().expect("lock");
3420            *epoch = Some(1);
3421        }
3422        let _zombie = old_adapter.publish_produced_block(&poa_block_at_time(2, 300));
3423
3424        // 5. Reconciliation with sub-quorum repair
3425        //    Put an orphan block on node A only at height 2
3426        let orphan = poa_block_at_time(2, 400);
3427        let orphan_data = postcard::to_allocvec(&orphan).expect("serialize orphan");
3428        let client_a = redis::Client::open(redis_a.redis_url()).expect("redis client");
3429        let mut conn_a = client_a.get_connection().expect("redis connection");
3430        let epoch_val =
3431            (*adapter.current_epoch_token.lock().expect("lock")).expect("epoch set");
3432        #[allow(clippy::cast_possible_truncation)]
3433        let epoch_u32 = epoch_val as u32;
3434        append_stream_block(&mut conn_a, &stream_key, 2, &orphan_data, epoch_u32);
3435
3436        // 6. leader_state triggers reconciliation + repair
3437        let state = adapter
3438            .leader_state(2.into())
3439            .await
3440            .expect("leader_state should succeed");
3441        assert!(
3442            matches!(
3443                state,
3444                LeaderState::UnreconciledBlocks(ref blocks) if !blocks.is_empty()
3445            ),
3446            "Should have unreconciled blocks: {state:?}"
3447        );
3448
3449        // --- encode and verify ---
3450        let encoded =
3451            fuel_core_metrics::encode_metrics().expect("encode_metrics should succeed");
3452
3453        // Print full output for visual inspection
3454        let poa_lines: Vec<&str> =
3455            encoded.lines().filter(|l| l.contains("poa_")).collect();
3456        for line in &poa_lines {
3457            eprintln!("{line}");
3458        }
3459
3460        // Verify all metric names appear.
3461        // Counters get `_total` appended by prometheus-client automatically.
3462        let expected_names = [
3463            "poa_leader_epoch",
3464            "poa_is_leader",
3465            "poa_epoch_max_drift",
3466            "poa_stream_trim_headroom",
3467            "poa_write_block_success_total",
3468            "poa_write_block_height_exists_total",
3469            "poa_write_block_fencing_error_total",
3470            "poa_write_block_error_total",
3471            "poa_repair_success_total",
3472            "poa_promotion_success_total",
3473            "poa_promotion_duration_s",
3474            "poa_write_block_duration_s",
3475            "poa_reconciliation_duration_s",
3476            "poa_connection_reset_total",
3477        ];
3478        for name in &expected_names {
3479            assert!(
3480                encoded.contains(name),
3481                "Metric '{name}' missing from /v1/metrics output"
3482            );
3483        }
3484
3485        // Verify key metrics have non-zero values.
3486        // For counters, the data line is e.g. `poa_write_block_success_total 3`.
3487        // For gauges, it's e.g. `poa_leader_epoch 2`.
3488        // We find the line that starts with the name, excluding sub-metric
3489        // lines (like `_bucket`, `_sum`, `_count`).
3490        let non_zero_metrics = [
3491            "poa_leader_epoch",
3492            "poa_is_leader",
3493            "poa_write_block_success_total",
3494            "poa_promotion_success_total",
3495            "poa_repair_success_total",
3496        ];
3497        for name in &non_zero_metrics {
3498            let metric_line = encoded
3499                .lines()
3500                .find(|l| {
3501                    l.starts_with(name)
3502                        && !l.starts_with(&format!("{name}_"))
3503                        && !l.starts_with('#')
3504                })
3505                .unwrap_or_else(|| panic!("No data line for {name}"));
3506            assert!(
3507                !metric_line.ends_with(" 0"),
3508                "Metric '{name}' should be non-zero, got: {metric_line}"
3509            );
3510        }
3511    }
3512
3513    /// When quorum reads fail during reconciliation, a subsequent call
3514    /// should still be able to read the same backlog entries.
3515    #[tokio::test(flavor = "multi_thread")]
3516    async fn unreconciled_blocks__after_quorum_read_failure_then_backlog_remains_readable()
3517     {
3518        // given — 3 Redis nodes, block published to all 3
3519        let mut redis_a = RedisTestServer::spawn();
3520        let mut redis_b = RedisTestServer::spawn();
3521        let redis_c = RedisTestServer::spawn();
3522        let lease_key = "poa:test:cursor-restore-quorum".to_string();
3523        let stream_key = format!("{lease_key}:block:stream");
3524        let redis_urls = vec![
3525            redis_a.redis_url(),
3526            redis_b.redis_url(),
3527            redis_c.redis_url(),
3528        ];
3529
3530        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3531        assert!(
3532            adapter
3533                .acquire_lease_if_free()
3534                .await
3535                .expect("acquire should succeed"),
3536            "Should acquire lease"
3537        );
3538
3539        let block = poa_block_at_time(1, 100);
3540        adapter
3541            .publish_produced_block(&block)
3542            .expect("publish should succeed on all 3 nodes");
3543        adapter.release().await.expect("release should succeed");
3544
3545        // when — kill 2 nodes so quorum read fails
3546        redis_a.stop();
3547        redis_b.stop();
3548
3549        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3550        {
3551            let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3552            *epoch = Some(99);
3553        }
3554        let result = adapter_b.unreconciled_blocks(1.into()).await;
3555        assert!(result.is_err(), "Should fail with quorum read failure");
3556
3557        // Restart the killed nodes — all 3 now reachable
3558        redis_a.start();
3559        redis_b.start();
3560
3561        // Re-publish block to the restarted nodes so they have data
3562        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3563        let mut conn_a = client_a.get_connection().expect("conn");
3564        let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3565        let mut conn_b = client_b.get_connection().expect("conn");
3566        let block_data = postcard::to_allocvec(&block).expect("serialize");
3567        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3568        append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 1);
3569
3570        // then — subsequent call must still see the block on node C
3571        let blocks = adapter_b
3572            .unreconciled_blocks(1.into())
3573            .await
3574            .expect("reconciliation should succeed now");
3575        assert_eq!(
3576            blocks.len(),
3577            1,
3578            "Quorum read failure should not make backlog entries unreadable"
3579        );
3580    }
3581
3582    /// When sub-quorum repair fails, the next reconciliation round
3583    /// should still be able to re-read and retry.
3584    #[tokio::test(flavor = "multi_thread")]
3585    async fn unreconciled_blocks__after_repair_failure_then_backlog_remains_readable() {
3586        // given — 3 Redis nodes, block published to only 1 node (sub-quorum)
3587        let redis_a = RedisTestServer::spawn();
3588        let redis_b = RedisTestServer::spawn();
3589        let redis_c = RedisTestServer::spawn();
3590        let lease_key = "poa:test:cursor-restore-repair".to_string();
3591        let stream_key = format!("{lease_key}:block:stream");
3592        let redis_urls = vec![
3593            redis_a.redis_url(),
3594            redis_b.redis_url(),
3595            redis_c.redis_url(),
3596        ];
3597
3598        let block = poa_block_at_time(1, 100);
3599        let block_data = postcard::to_allocvec(&block).expect("serialize");
3600
3601        // Write block to only node A — sub-quorum (1/3)
3602        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3603        let mut conn_a = client_a.get_connection().expect("conn");
3604        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3605
3606        // Adapter without a lease — repair will fail (no lock held)
3607        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3608        // Set epoch but do NOT acquire lease — repair_sub_quorum_block
3609        // will get FencingRejected or fail to reach quorum
3610        {
3611            let mut epoch = adapter.current_epoch_token.lock().expect("lock");
3612            *epoch = Some(99);
3613        }
3614
3615        // First call reads entries, then repair fails because we don't hold the lock.
3616        let result = adapter.unreconciled_blocks(1.into()).await;
3617        // Repair failure now returns an error because backlog remains unresolved.
3618        assert!(
3619            result.is_err(),
3620            "Should return error when repair fails and backlog remains unresolved"
3621        );
3622
3623        // Now acquire the lease so repair can succeed
3624        assert!(
3625            adapter
3626                .acquire_lease_if_free()
3627                .await
3628                .expect("acquire should succeed"),
3629            "Should acquire lease"
3630        );
3631
3632        // then — second call must still see the sub-quorum block
3633        let blocks = adapter
3634            .unreconciled_blocks(1.into())
3635            .await
3636            .expect("reconciliation should succeed with lock held");
3637        assert_eq!(
3638            blocks.len(),
3639            1,
3640            "Repair failure should not make backlog unreadable on the next round"
3641        );
3642    }
3643}