Skip to main content

fuel_core/service/adapters/consensus_module/
poa.rs

1use crate::{
2    fuel_core_graphql_api::ports::ConsensusModulePort,
3    service::adapters::{
4        BlockImporterAdapter,
5        BlockProducerAdapter,
6        P2PAdapter,
7        PoAAdapter,
8        TxPoolAdapter,
9    },
10};
11use anyhow::anyhow;
12use fuel_core_importer::ports::{
13    BlockReconciliationWritePort,
14    ImporterDatabase,
15};
16use fuel_core_metrics::poa_metrics::poa_metrics;
17use fuel_core_poa::{
18    ports::{
19        BlockImporter,
20        BlockReconciliationReadPort,
21        LeaderState,
22        P2pPort,
23        PredefinedBlocks,
24        TransactionPool,
25        TransactionsSource,
26    },
27    service::{
28        Mode,
29        SharedState,
30    },
31};
32use fuel_core_services::stream::BoxStream;
33use fuel_core_storage::transactional::Changes;
34use fuel_core_types::{
35    blockchain::{
36        SealedBlock,
37        block::Block,
38        primitives::BlockId,
39    },
40    fuel_types::BlockHeight,
41    services::{
42        block_importer::{
43            BlockImportInfo,
44            UncommittedResult as UncommittedImporterResult,
45        },
46        executor::UncommittedResult,
47    },
48    tai64::Tai64,
49};
50use std::{
51    collections::HashMap,
52    path::{
53        Path,
54        PathBuf,
55    },
56    time::Duration,
57};
58use tokio::{
59    sync::{
60        Mutex,
61        watch,
62    },
63    time::{
64        Instant,
65        sleep,
66        timeout,
67    },
68};
69use tokio_stream::{
70    StreamExt,
71    wrappers::BroadcastStream,
72};
73use tracing::error;
74
75pub mod pre_confirmation_signature;
76
77const CHECK_LEASE_OWNER_SCRIPT: &str = include_str!(concat!(
78    env!("CARGO_MANIFEST_DIR"),
79    "/redis_leader_lease_adapter_scripts/check_lease_owner.lua"
80));
81
82const RELEASE_LOCK_SCRIPT: &str = include_str!(concat!(
83    env!("CARGO_MANIFEST_DIR"),
84    "/redis_leader_lease_adapter_scripts/release_lock.lua"
85));
86
87const PROMOTE_LEADER_SCRIPT: &str = include_str!(concat!(
88    env!("CARGO_MANIFEST_DIR"),
89    "/redis_leader_lease_adapter_scripts/promote_leader.lua"
90));
91
92const WRITE_BLOCK_SCRIPT: &str = include_str!(concat!(
93    env!("CARGO_MANIFEST_DIR"),
94    "/redis_leader_lease_adapter_scripts/write_block.lua"
95));
96
97const READ_STREAM_ENTRIES_SCRIPT: &str = include_str!(concat!(
98    env!("CARGO_MANIFEST_DIR"),
99    "/redis_leader_lease_adapter_scripts/read_stream_entries.lua"
100));
101
102const READ_LATEST_STREAM_ENTRY_SCRIPT: &str = include_str!(concat!(
103    env!("CARGO_MANIFEST_DIR"),
104    "/redis_leader_lease_adapter_scripts/read_latest_stream_entry.lua"
105));
106
107struct RedisNode {
108    redis_client: redis::Client,
109    cached_connection: Mutex<Option<redis::aio::MultiplexedConnection>>,
110}
111
112impl Clone for RedisNode {
113    fn clone(&self) -> Self {
114        Self {
115            redis_client: self.redis_client.clone(),
116            cached_connection: Mutex::new(None),
117        }
118    }
119}
120
121pub struct RedisLeaderLeaseAdapter {
122    redis_nodes: Vec<RedisNode>,
123    quorum: usize,
124    quorum_disruption_budget: u32,
125    lease_key: String,
126    epoch_key: String,
127    block_stream_key: String,
128    lease_owner_token: String,
129    drop_release_guard: std::sync::Arc<()>,
130    current_epoch_token: std::sync::Arc<std::sync::Mutex<Option<u64>>>,
131    lease_ttl_millis: u64,
132    lease_drift_millis: u64,
133    node_timeout: Duration,
134    retry_delay_millis: u64,
135    max_retry_delay_offset_millis: u64,
136    max_attempts: usize,
137    stream_max_len: u32,
138}
139
140impl Clone for RedisLeaderLeaseAdapter {
141    fn clone(&self) -> Self {
142        Self {
143            redis_nodes: self.redis_nodes.clone(),
144            quorum: self.quorum,
145            quorum_disruption_budget: self.quorum_disruption_budget,
146            lease_key: self.lease_key.clone(),
147            epoch_key: self.epoch_key.clone(),
148            block_stream_key: self.block_stream_key.clone(),
149            lease_owner_token: self.lease_owner_token.clone(),
150            drop_release_guard: self.drop_release_guard.clone(),
151            current_epoch_token: self.current_epoch_token.clone(),
152            lease_ttl_millis: self.lease_ttl_millis,
153            lease_drift_millis: self.lease_drift_millis,
154            node_timeout: self.node_timeout,
155            retry_delay_millis: self.retry_delay_millis,
156            max_retry_delay_offset_millis: self.max_retry_delay_offset_millis,
157            max_attempts: self.max_attempts,
158            stream_max_len: self.stream_max_len,
159        }
160    }
161}
162
163#[derive(Default, Clone)]
164pub struct NoopReconciliationAdapter;
165
166#[allow(clippy::large_enum_variant)]
167pub enum ReconciliationAdapter {
168    Redis(RedisLeaderLeaseAdapter),
169    Noop(NoopReconciliationAdapter),
170}
171
172impl RedisLeaderLeaseAdapter {
173    fn calculate_quorum(redis_nodes_len: usize, quorum_disruption_budget: u32) -> usize {
174        let majority = redis_nodes_len
175            .checked_div(2)
176            .unwrap_or(0)
177            .saturating_add(1);
178        let disruption_budget = usize::try_from(quorum_disruption_budget).unwrap_or(0);
179        majority
180            .saturating_add(disruption_budget)
181            .min(redis_nodes_len)
182    }
183
184    #[allow(clippy::too_many_arguments)]
185    pub fn new(
186        redis_urls: Vec<String>,
187        lease_key: String,
188        lease_ttl: Duration,
189        node_timeout: Duration,
190        retry_delay: Duration,
191        max_retry_delay_offset: Duration,
192        max_attempts: u32,
193        stream_max_len: u32,
194    ) -> anyhow::Result<Self> {
195        let redis_nodes = redis_urls
196            .into_iter()
197            .map(|redis_url| {
198                redis::Client::open(redis_url).map(|redis_client| RedisNode {
199                    redis_client,
200                    cached_connection: Mutex::new(None),
201                })
202            })
203            .collect::<Result<Vec<_>, _>>()?;
204        if redis_nodes.is_empty() {
205            return Err(anyhow!(
206                "At least one redis url is required for leader lock"
207            ));
208        }
209        let quorum_disruption_budget = 0u32;
210        let quorum = Self::calculate_quorum(redis_nodes.len(), quorum_disruption_budget);
211        let lease_ttl_millis = u64::try_from(lease_ttl.as_millis())?;
212        let retry_delay_millis = u64::try_from(retry_delay.as_millis())?;
213        let max_retry_delay_offset_millis =
214            u64::try_from(max_retry_delay_offset.as_millis())?;
215        let max_attempts = usize::try_from(max_attempts)?.max(1);
216        let lease_owner_token = uuid::Uuid::new_v4().to_string();
217        let epoch_key = format!("{lease_key}:epoch:token");
218        let block_stream_key = format!("{lease_key}:block:stream");
219        let lease_drift_millis = lease_ttl_millis
220            .checked_div(100)
221            .unwrap_or(0)
222            .saturating_add(2);
223        Ok(Self {
224            redis_nodes,
225            quorum,
226            quorum_disruption_budget,
227            lease_key,
228            epoch_key,
229            block_stream_key,
230            lease_owner_token,
231            drop_release_guard: std::sync::Arc::new(()),
232            current_epoch_token: std::sync::Arc::new(std::sync::Mutex::new(None)),
233            lease_ttl_millis,
234            lease_drift_millis,
235            node_timeout,
236            retry_delay_millis,
237            max_retry_delay_offset_millis,
238            max_attempts,
239            stream_max_len,
240        })
241    }
242
243    pub fn with_quorum_disruption_budget(
244        mut self,
245        quorum_disruption_budget: u32,
246    ) -> Self {
247        self.quorum_disruption_budget = quorum_disruption_budget;
248        self.quorum =
249            Self::calculate_quorum(self.redis_nodes.len(), quorum_disruption_budget);
250        self
251    }
252
253    async fn multiplexed_connection(
254        &self,
255        redis_node: &RedisNode,
256    ) -> anyhow::Result<redis::aio::MultiplexedConnection> {
257        if let Some(connection) =
258            redis_node.cached_connection.lock().await.as_ref().cloned()
259        {
260            return Ok(connection);
261        }
262
263        let new_connection = timeout(
264            self.node_timeout,
265            redis_node.redis_client.get_multiplexed_async_connection(),
266        )
267        .await
268        .map_err(|_| anyhow!("Timed out while connecting to redis leader-lock node"))??;
269        let mut cached_connection = redis_node.cached_connection.lock().await;
270        if let Some(connection) = cached_connection.as_ref().cloned() {
271            return Ok(connection);
272        }
273        *cached_connection = Some(new_connection.clone());
274        Ok(new_connection)
275    }
276
277    async fn clear_cached_connection(&self, redis_node: &RedisNode) {
278        let mut cached_connection = redis_node.cached_connection.lock().await;
279        *cached_connection = None;
280        poa_metrics().connection_reset_total.inc();
281    }
282
283    async fn check_lease_owner_on_node(&self, redis_node: &RedisNode) -> bool {
284        let mut connection = match self.multiplexed_connection(redis_node).await {
285            Ok(connection) => connection,
286            Err(_) => return false,
287        };
288        let is_owner = timeout(
289            self.node_timeout,
290            redis::Script::new(CHECK_LEASE_OWNER_SCRIPT)
291                .key(&self.lease_key)
292                .arg(&self.lease_owner_token)
293                .invoke_async::<i32>(&mut connection),
294        )
295        .await;
296        match is_owner {
297            Ok(Ok(is_owner)) => is_owner == 1,
298            Err(_) => {
299                self.clear_cached_connection(redis_node).await;
300                false
301            }
302            Ok(Err(_)) => {
303                self.clear_cached_connection(redis_node).await;
304                false
305            }
306        }
307    }
308
309    async fn promote_leader_on_node(
310        &self,
311        redis_node: &RedisNode,
312    ) -> anyhow::Result<Option<u64>> {
313        let mut connection = match self.multiplexed_connection(redis_node).await {
314            Ok(connection) => connection,
315            Err(_) => return Ok(None),
316        };
317        let promoted = timeout(
318            self.node_timeout,
319            redis::Script::new(PROMOTE_LEADER_SCRIPT)
320                .key(&self.lease_key)
321                .key(&self.epoch_key)
322                .arg(&self.lease_owner_token)
323                .arg(self.lease_ttl_millis)
324                .invoke_async::<u64>(&mut connection),
325        )
326        .await;
327        match promoted {
328            Ok(Ok(token)) => Ok(Some(token)),
329            Ok(Err(err)) => {
330                if err.to_string().contains("LOCK_HELD:") {
331                    return Ok(None);
332                }
333                self.clear_cached_connection(redis_node).await;
334                Ok(None)
335            }
336            Err(_) => {
337                self.clear_cached_connection(redis_node).await;
338                Ok(None)
339            }
340        }
341    }
342
343    async fn release_lease_on_node(&self, redis_node: &RedisNode) -> bool {
344        let mut connection = match self.multiplexed_connection(redis_node).await {
345            Ok(connection) => connection,
346            Err(_) => return false,
347        };
348        let released = timeout(
349            self.node_timeout,
350            redis::Script::new(RELEASE_LOCK_SCRIPT)
351                .key(&self.lease_key)
352                .arg(&self.lease_owner_token)
353                .invoke_async::<i32>(&mut connection),
354        )
355        .await;
356        match released {
357            Ok(Ok(released)) => released == 1,
358            Err(_) => {
359                self.clear_cached_connection(redis_node).await;
360                false
361            }
362            Ok(Err(_)) => {
363                self.clear_cached_connection(redis_node).await;
364                false
365            }
366        }
367    }
368
369    fn quorum_reached(&self, success_count: usize) -> bool {
370        success_count >= self.quorum
371    }
372
373    fn calculate_remaining_validity_millis(&self, elapsed_millis: u64) -> u64 {
374        self.lease_ttl_millis
375            .saturating_sub(elapsed_millis.saturating_add(self.lease_drift_millis))
376    }
377
378    fn random_retry_delay_offset_millis(&self) -> u64 {
379        if self.max_retry_delay_offset_millis == 0 {
380            return 0;
381        }
382        rand::random::<u64>()
383            .checked_rem(self.max_retry_delay_offset_millis.saturating_add(1))
384            .unwrap_or(0)
385    }
386
387    async fn release_lease_on_all_nodes(&self) {
388        let _ = futures::future::join_all(
389            self.redis_nodes
390                .iter()
391                .map(|redis_node| self.release_lease_on_node(redis_node)),
392        )
393        .await;
394    }
395
396    async fn delay_next_retry(&self) {
397        let retry_delay_millis = self
398            .retry_delay_millis
399            .saturating_add(self.random_retry_delay_offset_millis());
400        sleep(Duration::from_millis(retry_delay_millis)).await;
401    }
402
403    async fn has_lease_owner_quorum(&self) -> anyhow::Result<bool> {
404        let ownership = futures::future::join_all(
405            self.redis_nodes
406                .iter()
407                .map(|redis_node| self.check_lease_owner_on_node(redis_node)),
408        )
409        .await;
410        let owner_count = ownership.iter().filter(|&&is_owner| is_owner).count();
411        if !self.quorum_reached(owner_count) {
412            return Ok(false);
413        }
414
415        // Best-effort: acquire the lock on nodes we don't own yet.
416        // Expands write coverage beyond minimum quorum so block data
417        // is replicated to more nodes, improving fault tolerance.
418        // If a newly-acquired node returns a higher epoch (from
419        // election storm drift), adopt it so write_block.lua uses
420        // a consistent epoch across all owned nodes.
421        let non_owned: Vec<&RedisNode> = self
422            .redis_nodes
423            .iter()
424            .zip(ownership.iter())
425            .filter(|(_, is_owner)| !**is_owner)
426            .map(|(node, _)| node)
427            .collect();
428
429        if !non_owned.is_empty() {
430            let results = futures::future::join_all(
431                non_owned
432                    .into_iter()
433                    .map(|redis_node| self.promote_leader_on_node(redis_node)),
434            )
435            .await;
436
437            if let Some(max_new) =
438                results.into_iter().filter_map(|r| r.ok().flatten()).max()
439                && let Ok(mut epoch) = self.current_epoch_token.lock()
440            {
441                let current = epoch.unwrap_or(0);
442                if max_new > current {
443                    tracing::debug!(
444                        old_epoch = current,
445                        new_epoch = max_new,
446                        "Adopted higher epoch from lock expansion"
447                    );
448                    *epoch = Some(max_new);
449                }
450            }
451        }
452
453        Ok(true)
454    }
455
456    async fn acquire_lease_if_free(&self) -> anyhow::Result<bool> {
457        let promotion_start = std::time::Instant::now();
458        for attempt_index in 0..self.max_attempts {
459            let start = std::time::Instant::now();
460            let promoted_nodes = futures::future::join_all(
461                self.redis_nodes
462                    .iter()
463                    .map(|redis_node| self.promote_leader_on_node(redis_node)),
464            )
465            .await;
466            let promoted_tokens = promoted_nodes
467                .into_iter()
468                .filter_map(|token| token.ok().flatten())
469                .collect::<Vec<_>>();
470            let acquired_count = promoted_tokens.len();
471            let elapsed_millis =
472                u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
473            let validity_millis =
474                self.calculate_remaining_validity_millis(elapsed_millis);
475            if self.quorum_reached(acquired_count) && validity_millis > 0 {
476                // Record epoch drift across quorum nodes
477                if promoted_tokens.len() > 1
478                    && let (Some(min_tok), Some(max_tok)) = (
479                        promoted_tokens.iter().copied().min(),
480                        promoted_tokens.iter().copied().max(),
481                    )
482                {
483                    poa_metrics().epoch_max_drift.set(
484                        i64::try_from(max_tok.saturating_sub(min_tok))
485                            .unwrap_or(i64::MAX),
486                    );
487                }
488                if let Some(max_token) = promoted_tokens.into_iter().max() {
489                    let mut current_epoch_token = self
490                        .current_epoch_token
491                        .lock()
492                        .map_err(|e| anyhow!("epoch token lock poisoned: {}", e))?;
493                    *current_epoch_token = Some(max_token);
494                    poa_metrics()
495                        .leader_epoch
496                        .set(i64::try_from(max_token).unwrap_or(i64::MAX));
497                }
498                poa_metrics().promotion_success_total.inc();
499                poa_metrics()
500                    .promotion_duration_s
501                    .observe(promotion_start.elapsed().as_secs_f64());
502                return Ok(true);
503            }
504            self.release_lease_on_all_nodes().await;
505            let is_last_attempt = attempt_index.saturating_add(1) == self.max_attempts;
506            if !is_last_attempt {
507                self.delay_next_retry().await;
508            }
509        }
510        poa_metrics().promotion_failure_total.inc();
511        poa_metrics()
512            .promotion_duration_s
513            .observe(promotion_start.elapsed().as_secs_f64());
514        Ok(false)
515    }
516
517    async fn release_lease_on_client(
518        redis_client: redis::Client,
519        lease_key: String,
520        lease_owner_token: String,
521        node_timeout: Duration,
522    ) {
523        let connection = timeout(
524            node_timeout,
525            redis_client.get_multiplexed_async_connection(),
526        )
527        .await;
528        let mut connection = match connection {
529            Ok(Ok(connection)) => connection,
530            Err(_) => return,
531            Ok(Err(_)) => return,
532        };
533        let _ = timeout(
534            node_timeout,
535            redis::Script::new(RELEASE_LOCK_SCRIPT)
536                .key(lease_key)
537                .arg(lease_owner_token)
538                .invoke_async::<i32>(&mut connection),
539        )
540        .await;
541    }
542
543    async fn release_lease_on_clients(
544        redis_clients: Vec<redis::Client>,
545        lease_key: String,
546        lease_owner_token: String,
547        node_timeout: Duration,
548    ) {
549        let _ =
550            futures::future::join_all(redis_clients.into_iter().map(|redis_client| {
551                Self::release_lease_on_client(
552                    redis_client,
553                    lease_key.clone(),
554                    lease_owner_token.clone(),
555                    node_timeout,
556                )
557            }))
558            .await;
559    }
560
561    fn release_lease_on_clients_sync(
562        redis_clients: Vec<redis::Client>,
563        lease_key: String,
564        lease_owner_token: String,
565    ) {
566        redis_clients.into_iter().for_each(|redis_client| {
567            let Ok(mut connection) = redis_client.get_connection() else {
568                return;
569            };
570            let _ = redis::Script::new(RELEASE_LOCK_SCRIPT)
571                .key(&lease_key)
572                .arg(&lease_owner_token)
573                .invoke::<i32>(&mut connection);
574        });
575    }
576
577    async fn read_latest_stream_entry_on_node(
578        &self,
579        redis_node: &RedisNode,
580    ) -> anyhow::Result<Option<(u32, String)>> {
581        let mut connection = self.multiplexed_connection(redis_node).await?;
582        let latest_entry = timeout(
583            self.node_timeout,
584            redis::Script::new(READ_LATEST_STREAM_ENTRY_SCRIPT)
585                .key(&self.block_stream_key)
586                .invoke_async::<Vec<String>>(&mut connection),
587        )
588        .await;
589        match latest_entry {
590            Err(_) => {
591                self.clear_cached_connection(redis_node).await;
592                Err(anyhow!(
593                    "Timed out reading latest stream entry from Redis node"
594                ))
595            }
596            Ok(Err(e)) => {
597                self.clear_cached_connection(redis_node).await;
598                Err(anyhow!(
599                    "Failed to read latest stream entry from Redis node: {e}"
600                ))
601            }
602            Ok(Ok(entry)) => {
603                if entry.len() != 2 {
604                    return Ok(None);
605                }
606                let height = entry[0]
607                    .parse::<u32>()
608                    .map_err(|e| anyhow!("Invalid latest stream entry height: {e}"))?;
609                Ok(Some((height, entry[1].clone())))
610            }
611        }
612    }
613
614    async fn should_reconcile_from_stream(
615        &self,
616        next_height: BlockHeight,
617    ) -> anyhow::Result<bool> {
618        let next_height = u32::from(next_height);
619        let latest_results = futures::future::join_all(
620            self.redis_nodes
621                .iter()
622                .map(|redis_node| self.read_latest_stream_entry_on_node(redis_node)),
623        )
624        .await;
625        let mut successful_reads = 0usize;
626        let mut failed_count = 0usize;
627        let mut nodes_indicating_backlog = 0usize;
628        for result in latest_results {
629            match result {
630                Ok(Some((latest_height, _latest_stream_id))) => {
631                    successful_reads = successful_reads.saturating_add(1);
632                    if latest_height >= next_height {
633                        nodes_indicating_backlog =
634                            nodes_indicating_backlog.saturating_add(1);
635                    }
636                }
637                Ok(None) => {
638                    successful_reads = successful_reads.saturating_add(1);
639                }
640                Err(e) => {
641                    tracing::warn!("Redis latest stream read failed: {e}");
642                    failed_count = failed_count.saturating_add(1);
643                }
644            }
645        }
646        if !self.quorum_reached(successful_reads) {
647            return Err(anyhow!(
648                "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
649                successful_reads,
650                self.redis_nodes.len(),
651                failed_count
652            ));
653        }
654        Ok(nodes_indicating_backlog > 0)
655    }
656
657    async fn read_stream_entries_on_node(
658        &self,
659        redis_node: &RedisNode,
660        next_height: u32,
661        max_entries: usize,
662    ) -> anyhow::Result<Vec<(u32, u64, SealedBlock)>> {
663        if max_entries == 0 {
664            return Ok(Vec::new());
665        }
666
667        let mut connection = self.multiplexed_connection(redis_node).await?;
668        let count = u32::try_from(max_entries).unwrap_or(u32::MAX);
669        let stream_entries = timeout(
670            self.node_timeout,
671            redis::Script::new(READ_STREAM_ENTRIES_SCRIPT)
672                .key(&self.block_stream_key)
673                .arg(next_height)
674                .arg(count)
675                .invoke_async::<Vec<(u32, u64, Vec<u8>, String)>>(&mut connection),
676        )
677        .await;
678
679        let entries = match stream_entries {
680            Err(_) => {
681                self.clear_cached_connection(redis_node).await;
682                return Err(anyhow!("Timed out reading stream entries from Redis node"));
683            }
684            Ok(Err(e)) => {
685                self.clear_cached_connection(redis_node).await;
686                return Err(anyhow!(
687                    "Failed to read stream entries from Redis node: {e}"
688                ));
689            }
690            Ok(Ok(entries)) => entries,
691        };
692
693        let mut blocks = Vec::new();
694        for (height, epoch, bytes, _stream_id) in entries {
695            match postcard::from_bytes::<SealedBlock>(&bytes) {
696                Ok(block) => blocks.push((height, epoch, block)),
697                Err(e) => {
698                    tracing::warn!(
699                        "Skipping stream entry: failed to deserialize block at height {height}: {e}"
700                    );
701                }
702            }
703        }
704
705        Ok(blocks)
706    }
707
708    async fn unreconciled_blocks(
709        &self,
710        next_height: BlockHeight,
711    ) -> anyhow::Result<Vec<SealedBlock>> {
712        if !self.should_reconcile_from_stream(next_height).await? {
713            return Ok(Vec::new());
714        }
715        let mut reconciled = Vec::new();
716        let max_reconcile_blocks_per_round =
717            usize::try_from(self.stream_max_len).unwrap_or(usize::MAX);
718        let next_height_u32 = u32::from(next_height);
719        let read_results =
720            futures::future::join_all(self.redis_nodes.iter().map(|redis_node| {
721                self.read_stream_entries_on_node(
722                    redis_node,
723                    next_height_u32,
724                    max_reconcile_blocks_per_round,
725                )
726            }))
727            .await;
728
729        let mut successful_reads = Vec::new();
730        let mut failed_count = 0usize;
731        for result in read_results {
732            match result {
733                Ok(entries) => successful_reads.push(entries),
734                Err(e) => {
735                    tracing::warn!("Redis stream read failed: {e}");
736                    failed_count = failed_count.saturating_add(1);
737                }
738            }
739        }
740
741        if !self.quorum_reached(successful_reads.len()) {
742            return Err(anyhow!(
743                "Cannot reconcile: only {}/{} Redis nodes responded ({} failed)",
744                successful_reads.len(),
745                self.redis_nodes.len(),
746                failed_count
747            ));
748        }
749
750        let blocks_by_node = successful_reads
751            .into_iter()
752            .map(|entries| {
753                entries.into_iter().fold(
754                    HashMap::<u32, HashMap<u64, SealedBlock>>::new(),
755                    |mut blocks_by_height, (height, epoch, block)| {
756                        blocks_by_height
757                            .entry(height)
758                            .or_default()
759                            .insert(epoch, block);
760                        blocks_by_height
761                    },
762                )
763            })
764            .collect::<Vec<_>>();
765
766        // Compute stream trim headroom: min stream height - local committed height
767        let min_stream_height = blocks_by_node
768            .iter()
769            .flat_map(|blocks_by_height| blocks_by_height.keys().copied())
770            .min();
771        if let Some(min_h) = min_stream_height {
772            let local_committed = i64::from(u32::from(next_height).saturating_sub(1));
773            let headroom = i64::from(min_h).saturating_sub(local_committed);
774            poa_metrics().stream_trim_headroom.set(headroom);
775        }
776
777        let mut current_height = u32::from(next_height);
778
779        for _ in 0..max_reconcile_blocks_per_round {
780            let nodes_with_height = blocks_by_node
781                .iter()
782                .filter(|blocks_by_height| blocks_by_height.contains_key(&current_height))
783                .count();
784
785            tracing::debug!(
786                "unreconciled_blocks: height={current_height} nodes_with_height={nodes_with_height}/{}",
787                blocks_by_node.len()
788            );
789
790            if nodes_with_height == 0 {
791                if reconciled.is_empty() {
792                    return Err(anyhow!(
793                        "Backlog unresolved at height {current_height}: \
794                         stream indicates backlog but no entries found at next height"
795                    ));
796                }
797                break;
798            }
799
800            let votes = blocks_by_node
801                .iter()
802                .filter_map(|blocks_by_height| blocks_by_height.get(&current_height))
803                .flat_map(|blocks_by_epoch| blocks_by_epoch.iter())
804                .fold(
805                    HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(),
806                    |mut votes, (epoch, block)| {
807                        let vote_key = (*epoch, block.entity.id());
808                        match votes.get_mut(&vote_key) {
809                            Some((count, _)) => {
810                                *count = count.saturating_add(1);
811                            }
812                            None => {
813                                votes.insert(vote_key, (1, block.clone()));
814                            }
815                        }
816                        votes
817                    },
818                );
819
820            let winner = votes
821                .into_iter()
822                .max_by_key(|((epoch, _), _)| *epoch)
823                .map(|(_, (count, block))| (count, block));
824
825            if let Some((count, block)) = winner {
826                if self.quorum_reached(count) {
827                    // Block already has quorum — reconcile it directly
828                    reconciled.push(block);
829                } else {
830                    // Sub-quorum block: repropose to all nodes to reach quorum.
831                    // This repairs orphaned partial writes from failed leaders.
832                    // HEIGHT_EXISTS on nodes that already have the block returns
833                    // Ok(false), and nodes missing it accept the write.
834                    tracing::info!(
835                        "Repairing sub-quorum block at height {current_height} \
836                         (found on {count}/{} nodes)",
837                        blocks_by_node.len()
838                    );
839                    match self.repair_sub_quorum_block(&block, count) {
840                        Ok(true) => {
841                            tracing::info!(
842                                "Repair succeeded — block at height {current_height} \
843                                 now has quorum"
844                            );
845                            reconciled.push(block);
846                        }
847                        Ok(false) => {
848                            tracing::warn!(
849                                "Repair failed to reach quorum at height \
850                                 {current_height} — will retry next round"
851                            );
852                            if reconciled.is_empty() {
853                                return Err(anyhow!(
854                                    "Backlog unresolved at height {current_height}: \
855                                     repair failed to reach quorum"
856                                ));
857                            }
858                            break;
859                        }
860                        Err(e) => {
861                            tracing::warn!(
862                                "Repair error at height {current_height}: {e}"
863                            );
864                            if reconciled.is_empty() {
865                                return Err(anyhow!(
866                                    "Backlog unresolved at height {current_height}: \
867                                     repair error: {e}"
868                                ));
869                            }
870                            break;
871                        }
872                    }
873                }
874            } else {
875                if reconciled.is_empty() {
876                    return Err(anyhow!(
877                        "Backlog unresolved at height {current_height}: \
878                         no winning block candidate"
879                    ));
880                }
881                break;
882            }
883
884            let Some(next) = current_height.checked_add(1) else {
885                break;
886            };
887            current_height = next;
888        }
889
890        Ok(reconciled)
891    }
892
893    async fn can_produce_block(&self) -> anyhow::Result<bool> {
894        tracing::debug!("Checking Redis leader lock");
895        if self.has_lease_owner_quorum().await? {
896            return Ok(true);
897        }
898        self.acquire_lease_if_free().await
899    }
900
901    async fn release_if_owner(&self) -> anyhow::Result<()> {
902        tracing::debug!("Releasing Redis leader lock");
903        if !self.has_lease_owner_quorum().await? {
904            let mut current_epoch_token = self
905                .current_epoch_token
906                .lock()
907                .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
908            *current_epoch_token = None;
909            return Ok(());
910        }
911
912        let releases = futures::future::join_all(
913            self.redis_nodes
914                .iter()
915                .map(|redis_node| self.release_lease_on_node(redis_node)),
916        )
917        .await;
918        let released_count = releases.into_iter().filter(|released| *released).count();
919        if self.quorum_reached(released_count) {
920            let mut current_epoch_token = self
921                .current_epoch_token
922                .lock()
923                .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?;
924            *current_epoch_token = None;
925            Ok(())
926        } else {
927            Err(anyhow!("Failed to release lease on quorum"))
928        }
929    }
930
931    fn publish_block_on_node(
932        &self,
933        redis_node: &RedisNode,
934        epoch: u64,
935        block: &SealedBlock,
936        block_data: &[u8],
937    ) -> anyhow::Result<WriteBlockResult> {
938        let mut connection = redis_node
939            .redis_client
940            .get_connection_with_timeout(self.node_timeout)?;
941        connection.set_read_timeout(Some(self.node_timeout))?;
942        connection.set_write_timeout(Some(self.node_timeout))?;
943        let block_height = u32::from(*block.entity.header().height());
944        let lua_start = std::time::Instant::now();
945        let write_result = redis::Script::new(WRITE_BLOCK_SCRIPT)
946            .key(&self.block_stream_key)
947            .key(&self.epoch_key)
948            .key(&self.lease_key)
949            .arg(epoch)
950            .arg(&self.lease_owner_token)
951            .arg(block_height)
952            .arg(block_data)
953            .arg(self.lease_ttl_millis)
954            .arg(self.stream_max_len)
955            .invoke::<String>(&mut connection);
956        poa_metrics()
957            .write_block_duration_s
958            .observe(lua_start.elapsed().as_secs_f64());
959        match write_result {
960            Ok(_) => {
961                poa_metrics().write_block_success_total.inc();
962                Ok(WriteBlockResult::Written)
963            }
964            Err(err) if err.to_string().contains("HEIGHT_EXISTS:") => {
965                poa_metrics().write_block_height_exists_total.inc();
966                tracing::debug!(
967                    "write_block: height already exists (height={block_height})"
968                );
969                Ok(WriteBlockResult::HeightExists)
970            }
971            Err(err) if err.to_string().contains("FENCING_ERROR:") => {
972                poa_metrics().write_block_fencing_error_total.inc();
973                tracing::warn!(
974                    "write_block: fencing rejected (height={block_height}): {err}"
975                );
976                Ok(WriteBlockResult::FencingRejected)
977            }
978            Err(err) => {
979                poa_metrics().write_block_error_total.inc();
980                Err(err.into())
981            }
982        }
983    }
984
985    /// Repropose a sub-quorum block to all Redis nodes to reach quorum.
986    /// Called during reconciliation when a block exists on some nodes but
987    /// below quorum — possibly from a leader that published and committed
988    /// locally but whose write only reached a subset of nodes.
989    ///
990    /// `pre_existing_count` is the number of nodes already confirmed to
991    /// have this specific block during the reconciliation read phase.
992    ///
993    /// Uses `publish_block_on_node` which runs `write_block.lua`:
994    /// - Written: node accepted the block (counted toward quorum)
995    /// - HEIGHT_EXISTS: node has *some* block at this height — may be a
996    ///   different block from a competing partial write, so NOT counted
997    /// - FENCING_ERROR: lost the lock — abort the repair
998    /// - The total (pre_existing + newly written) must reach quorum
999    fn repair_sub_quorum_block(
1000        &self,
1001        block: &SealedBlock,
1002        pre_existing_count: usize,
1003    ) -> anyhow::Result<bool> {
1004        let epoch = match *self
1005            .current_epoch_token
1006            .lock()
1007            .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1008        {
1009            Some(epoch) => epoch,
1010            None => {
1011                return Err(anyhow!(
1012                    "Cannot repair block because fencing token is not initialized"
1013                ));
1014            }
1015        };
1016        let block_data = postcard::to_allocvec(block)?;
1017        // Start from the pre-existing count (nodes already confirmed to
1018        // have this specific block during reconciliation). Only count
1019        // newly Written nodes — HeightExists means the node has *some*
1020        // block at this height, but it might be a different block from
1021        // a competing leader's partial write.
1022        let mut total_with_block = pre_existing_count;
1023        for redis_node in &self.redis_nodes {
1024            match self.publish_block_on_node(redis_node, epoch, block, &block_data) {
1025                Ok(WriteBlockResult::Written) => {
1026                    total_with_block = total_with_block.saturating_add(1);
1027                }
1028                Ok(WriteBlockResult::HeightExists) => {
1029                    // Node has some block at this height — may or may
1030                    // not be ours. Don't count it; the pre_existing_count
1031                    // already includes nodes confirmed to have our block.
1032                }
1033                Ok(WriteBlockResult::FencingRejected) => {
1034                    // Lost the lock — repair is invalid, abort
1035                    return Err(anyhow!(
1036                        "Lost lock during repair — another leader took over"
1037                    ));
1038                }
1039                Err(err) => {
1040                    tracing::debug!("Repair write to node failed: {err}");
1041                }
1042            }
1043        }
1044        let reached_quorum = self.quorum_reached(total_with_block);
1045        if reached_quorum {
1046            poa_metrics().repair_success_total.inc();
1047        } else {
1048            poa_metrics().repair_failure_total.inc();
1049        }
1050        Ok(reached_quorum)
1051    }
1052}
1053
1054/// Result of a `write_block.lua` invocation on a single Redis node.
1055enum WriteBlockResult {
1056    /// Block was successfully written to the stream.
1057    Written,
1058    /// A block at this height already exists in the stream.
1059    HeightExists,
1060    /// Lock lost or epoch is stale — another leader holds the lock.
1061    FencingRejected,
1062}
1063
1064impl PoAAdapter {
1065    pub fn new(shared_state: Option<SharedState>) -> Self {
1066        Self { shared_state }
1067    }
1068
1069    pub async fn manually_produce_blocks(
1070        &self,
1071        start_time: Option<Tai64>,
1072        mode: Mode,
1073    ) -> anyhow::Result<()> {
1074        self.shared_state
1075            .as_ref()
1076            .ok_or(anyhow!("The block production is disabled"))?
1077            .manually_produce_block(start_time, mode)
1078            .await
1079    }
1080}
1081
1082#[async_trait::async_trait]
1083impl BlockReconciliationReadPort for NoopReconciliationAdapter {
1084    async fn leader_state(
1085        &self,
1086        _next_height: BlockHeight,
1087    ) -> anyhow::Result<LeaderState> {
1088        Ok(LeaderState::ReconciledLeader)
1089    }
1090
1091    async fn release(&self) -> anyhow::Result<()> {
1092        Ok(())
1093    }
1094}
1095
1096#[async_trait::async_trait]
1097impl BlockReconciliationReadPort for RedisLeaderLeaseAdapter {
1098    async fn leader_state(
1099        &self,
1100        next_height: BlockHeight,
1101    ) -> anyhow::Result<LeaderState> {
1102        if self.can_produce_block().await? {
1103            poa_metrics().is_leader.set(1);
1104            if let Ok(epoch) = self.current_epoch_token.lock()
1105                && let Some(epoch) = *epoch
1106            {
1107                poa_metrics()
1108                    .leader_epoch
1109                    .set(i64::try_from(epoch).unwrap_or(i64::MAX));
1110            }
1111            let reconcile_start = std::time::Instant::now();
1112            let unreconciled_blocks = self.unreconciled_blocks(next_height).await?;
1113            poa_metrics()
1114                .reconciliation_duration_s
1115                .observe(reconcile_start.elapsed().as_secs_f64());
1116            if unreconciled_blocks.is_empty() {
1117                Ok(LeaderState::ReconciledLeader)
1118            } else {
1119                Ok(LeaderState::UnreconciledBlocks(unreconciled_blocks))
1120            }
1121        } else {
1122            poa_metrics().is_leader.set(0);
1123            Ok(LeaderState::ReconciledFollower)
1124        }
1125    }
1126
1127    async fn release(&self) -> anyhow::Result<()> {
1128        self.release_if_owner().await
1129    }
1130}
1131
1132#[async_trait::async_trait]
1133impl BlockReconciliationReadPort for ReconciliationAdapter {
1134    async fn leader_state(
1135        &self,
1136        next_height: BlockHeight,
1137    ) -> anyhow::Result<LeaderState> {
1138        match self {
1139            Self::Redis(adapter) => adapter.leader_state(next_height).await,
1140            Self::Noop(adapter) => adapter.leader_state(next_height).await,
1141        }
1142    }
1143
1144    async fn release(&self) -> anyhow::Result<()> {
1145        match self {
1146            Self::Redis(adapter) => adapter.release().await,
1147            Self::Noop(adapter) => adapter.release().await,
1148        }
1149    }
1150}
1151
1152impl Drop for RedisLeaderLeaseAdapter {
1153    fn drop(&mut self) {
1154        if std::sync::Arc::strong_count(&self.drop_release_guard) != 1 {
1155            return;
1156        }
1157
1158        let redis_clients = self
1159            .redis_nodes
1160            .iter()
1161            .map(|redis_node| redis_node.redis_client.clone())
1162            .collect::<Vec<_>>();
1163        if let Ok(runtime_handle) = tokio::runtime::Handle::try_current() {
1164            let release_future = timeout(
1165                Duration::from_millis(100),
1166                Self::release_lease_on_clients(
1167                    redis_clients,
1168                    self.lease_key.clone(),
1169                    self.lease_owner_token.clone(),
1170                    self.node_timeout,
1171                ),
1172            );
1173            drop(runtime_handle.spawn(async move {
1174                if release_future.await.is_err() {
1175                    error!("Failed to release leader lease: timeout");
1176                }
1177            }));
1178            return;
1179        }
1180
1181        Self::release_lease_on_clients_sync(
1182            redis_clients,
1183            self.lease_key.clone(),
1184            self.lease_owner_token.clone(),
1185        );
1186    }
1187}
1188
1189impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter {
1190    fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
1191        let epoch = match *self
1192            .current_epoch_token
1193            .lock()
1194            .map_err(|_| anyhow!("cannot access epoch token, poisoned lock"))?
1195        {
1196            Some(epoch) => epoch,
1197            None => {
1198                if matches!(
1199                    block.consensus,
1200                    fuel_core_types::blockchain::consensus::Consensus::Genesis(_)
1201                ) {
1202                    tracing::debug!(
1203                        "Skipping redis block publish for genesis block because fencing token is not initialized"
1204                    );
1205                    return Ok(());
1206                }
1207                return Err(anyhow!(
1208                    "Cannot publish block because fencing token is not initialized"
1209                ));
1210            }
1211        };
1212        let block_data = postcard::to_allocvec(block)?;
1213        let successes = self
1214            .redis_nodes
1215            .iter()
1216            .map(|redis_node| {
1217                match self.publish_block_on_node(redis_node, epoch, block, &block_data) {
1218                    Ok(WriteBlockResult::Written) => true,
1219                    Ok(_) => false,
1220                    Err(err) => {
1221                        tracing::debug!("Redis publish on node failed: {err}");
1222                        false
1223                    }
1224                }
1225            })
1226            .filter(|success| *success)
1227            .count();
1228        if self.quorum_reached(successes) {
1229            Ok(())
1230        } else {
1231            Err(anyhow!(
1232                "Failed to publish block to redis quorum with fencing checks"
1233            ))
1234        }
1235    }
1236}
1237
1238#[async_trait::async_trait]
1239impl ConsensusModulePort for PoAAdapter {
1240    async fn manually_produce_blocks(
1241        &self,
1242        start_time: Option<Tai64>,
1243        number_of_blocks: u32,
1244    ) -> anyhow::Result<()> {
1245        self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks })
1246            .await
1247    }
1248}
1249
1250#[cfg(feature = "p2p")]
1251impl P2pPort for P2PAdapter {
1252    fn reserved_peers_count(&self) -> BoxStream<usize> {
1253        if let Some(service) = &self.service {
1254            Box::pin(
1255                BroadcastStream::new(service.subscribe_reserved_peers_count())
1256                    .filter_map(|result| result.ok()),
1257            )
1258        } else {
1259            Box::pin(tokio_stream::pending())
1260        }
1261    }
1262}
1263
1264#[cfg(not(feature = "p2p"))]
1265impl P2pPort for P2PAdapter {
1266    fn reserved_peers_count(&self) -> BoxStream<usize> {
1267        Box::pin(tokio_stream::pending())
1268    }
1269}
1270
1271pub struct InDirectoryPredefinedBlocks {
1272    path_to_directory: Option<PathBuf>,
1273}
1274
1275impl InDirectoryPredefinedBlocks {
1276    pub fn new(path_to_directory: Option<PathBuf>) -> Self {
1277        Self { path_to_directory }
1278    }
1279}
1280
1281impl PredefinedBlocks for InDirectoryPredefinedBlocks {
1282    fn get_block(&self, height: &BlockHeight) -> anyhow::Result<Option<Block>> {
1283        let Some(path) = &self.path_to_directory else {
1284            return Ok(None);
1285        };
1286
1287        let block_height: u32 = (*height).into();
1288        if block_exists(path.as_path(), block_height) {
1289            let block_path = block_path(path.as_path(), block_height);
1290            let block_bytes = std::fs::read(block_path)?;
1291            let block: Block = serde_json::from_slice(block_bytes.as_slice())?;
1292            Ok(Some(block))
1293        } else {
1294            Ok(None)
1295        }
1296    }
1297}
1298
1299pub fn block_path(path_to_directory: &Path, block_height: u32) -> PathBuf {
1300    path_to_directory.join(format!("{block_height}.json"))
1301}
1302
1303pub fn block_exists(path_to_directory: &Path, block_height: u32) -> bool {
1304    block_path(path_to_directory, block_height).exists()
1305}
1306
1307impl TransactionPool for TxPoolAdapter {
1308    fn new_txs_watcher(&self) -> watch::Receiver<()> {
1309        self.service.get_new_executable_txs_notifier()
1310    }
1311}
1312
1313#[async_trait::async_trait]
1314impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter {
1315    async fn produce_and_execute_block(
1316        &self,
1317        height: BlockHeight,
1318        block_time: Tai64,
1319        source: TransactionsSource,
1320        deadline: Instant,
1321    ) -> anyhow::Result<UncommittedResult<Changes>> {
1322        match source {
1323            TransactionsSource::TxPool => {
1324                self.block_producer
1325                    .produce_and_execute_block_txpool(height, block_time, deadline)
1326                    .await
1327            }
1328            TransactionsSource::SpecificTransactions(txs) => {
1329                self.block_producer
1330                    .produce_and_execute_block_transactions(height, block_time, txs)
1331                    .await
1332            }
1333        }
1334    }
1335
1336    async fn produce_predefined_block(
1337        &self,
1338        block: &Block,
1339    ) -> anyhow::Result<UncommittedResult<Changes>> {
1340        self.block_producer
1341            .produce_and_execute_predefined(block, ())
1342            .await
1343    }
1344}
1345
1346#[async_trait::async_trait]
1347impl BlockImporter for BlockImporterAdapter {
1348    async fn commit_result(
1349        &self,
1350        result: UncommittedImporterResult<Changes>,
1351    ) -> anyhow::Result<()> {
1352        self.block_importer
1353            .commit_result(result)
1354            .await
1355            .map_err(Into::into)
1356    }
1357
1358    async fn execute_and_commit(&self, block: SealedBlock) -> anyhow::Result<()> {
1359        self.block_importer
1360            .execute_and_commit(block)
1361            .await
1362            .map_err(Into::into)
1363    }
1364
1365    fn block_stream(&self) -> BoxStream<BlockImportInfo> {
1366        Box::pin(
1367            BroadcastStream::new(self.block_importer.subscribe())
1368                .filter_map(|result| result.ok())
1369                .map(|result| BlockImportInfo::from(result.shared_result)),
1370        )
1371    }
1372
1373    fn latest_block_height(&self) -> anyhow::Result<Option<BlockHeight>> {
1374        self.database.latest_block_height().map_err(Into::into)
1375    }
1376}
1377
1378#[cfg(all(test, feature = "leader_lock", not(feature = "not_leader_lock")))]
1379#[allow(non_snake_case)]
1380mod tests {
1381    use super::*;
1382    use fuel_core_importer::ports::BlockReconciliationWritePort;
1383    use fuel_core_poa::ports::BlockReconciliationReadPort;
1384    use fuel_core_types::blockchain::consensus::Consensus;
1385    use std::{
1386        net::{
1387            SocketAddrV4,
1388            TcpListener,
1389            TcpStream,
1390        },
1391        process::{
1392            Child,
1393            Command,
1394            Stdio,
1395        },
1396        thread,
1397        time::Duration,
1398    };
1399
1400    #[tokio::test(flavor = "multi_thread")]
1401    async fn leader_state__when_same_height_has_multiple_stream_entries_then_returns_highest_epoch_block()
1402     {
1403        // given
1404        let redis = RedisTestServer::spawn();
1405        let lease_key = "poa:test:stream-conflict".to_string();
1406        let stream_key = format!("{lease_key}:block:stream");
1407        let adapter = RedisLeaderLeaseAdapter::new(
1408            vec![redis.redis_url()],
1409            lease_key,
1410            Duration::from_secs(2),
1411            Duration::from_millis(100),
1412            Duration::from_millis(50),
1413            Duration::from_millis(0),
1414            1,
1415            1000,
1416        )
1417        .expect("adapter should be created");
1418
1419        let low_epoch_block = poa_block_at_time(1, 10);
1420        let high_epoch_block = poa_block_at_time(1, 20);
1421
1422        let low_epoch_data =
1423            postcard::to_allocvec(&low_epoch_block).expect("serialize block");
1424        let high_epoch_data =
1425            postcard::to_allocvec(&high_epoch_block).expect("serialize block");
1426
1427        let redis_client =
1428            redis::Client::open(redis.redis_url()).expect("redis client should open");
1429        let mut conn = redis_client
1430            .get_connection()
1431            .expect("redis connection should open");
1432        append_stream_block(&mut conn, &stream_key, 1, &low_epoch_data, 1);
1433        append_stream_block(&mut conn, &stream_key, 1, &high_epoch_data, 2);
1434
1435        // when
1436        let leader_state = adapter
1437            .leader_state(1.into())
1438            .await
1439            .expect("leader_state should succeed");
1440
1441        // then
1442        let unreconciled_blocks = match leader_state {
1443            LeaderState::UnreconciledBlocks(blocks) => blocks,
1444            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1445        };
1446        assert_eq!(unreconciled_blocks.len(), 1);
1447        assert_eq!(
1448            unreconciled_blocks[0].entity.header().time(),
1449            high_epoch_block.entity.header().time(),
1450            "Expected reconciliation to pick the highest epoch block for the same height",
1451        );
1452    }
1453
1454    #[tokio::test(flavor = "multi_thread")]
1455    async fn leader_state__when_same_height_same_epoch_has_multiple_stream_entries_then_keeps_latest_entry()
1456     {
1457        // given
1458        let redis = RedisTestServer::spawn();
1459        let lease_key = "poa:test:equal-epoch-latest-entry".to_string();
1460        let stream_key = format!("{lease_key}:block:stream");
1461        let adapter = RedisLeaderLeaseAdapter::new(
1462            vec![redis.redis_url()],
1463            lease_key,
1464            Duration::from_secs(2),
1465            Duration::from_millis(100),
1466            Duration::from_millis(50),
1467            Duration::from_millis(0),
1468            1,
1469            1000,
1470        )
1471        .expect("adapter should be created");
1472
1473        let stale_block = poa_block_at_time(1, 10);
1474        let retry_block = poa_block_at_time(1, 20);
1475        let stale_data =
1476            postcard::to_allocvec(&stale_block).expect("stale block should serialize");
1477        let retry_data =
1478            postcard::to_allocvec(&retry_block).expect("retry block should serialize");
1479
1480        let redis_client =
1481            redis::Client::open(redis.redis_url()).expect("redis client should open");
1482        let mut conn = redis_client
1483            .get_connection()
1484            .expect("redis connection should open");
1485        append_stream_block(&mut conn, &stream_key, 1, &stale_data, 1);
1486        append_stream_block(&mut conn, &stream_key, 1, &retry_data, 1);
1487
1488        // when
1489        let leader_state = adapter
1490            .leader_state(1.into())
1491            .await
1492            .expect("leader_state should succeed");
1493
1494        // then
1495        let unreconciled_blocks = match leader_state {
1496            LeaderState::UnreconciledBlocks(blocks) => blocks,
1497            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1498        };
1499        assert_eq!(unreconciled_blocks.len(), 1);
1500        assert_eq!(
1501            unreconciled_blocks[0].entity.id(),
1502            retry_block.entity.id(),
1503            "Expected reconciliation to keep latest stream entry for equal epoch",
1504        );
1505    }
1506
1507    #[tokio::test(flavor = "multi_thread")]
1508    async fn leader_state__when_height_has_disagreeing_block_ids_then_repairs_with_highest_epoch_block()
1509     {
1510        // given: two different blocks at height 1 on different nodes, same epoch
1511        let redis_a = RedisTestServer::spawn();
1512        let redis_b = RedisTestServer::spawn();
1513        let redis_c = RedisTestServer::spawn();
1514        let lease_key = "poa:test:epoch-quorum-block-mismatch".to_string();
1515        let stream_key = format!("{lease_key}:block:stream");
1516        let adapter = new_test_adapter(
1517            vec![
1518                redis_a.redis_url(),
1519                redis_b.redis_url(),
1520                redis_c.redis_url(),
1521            ],
1522            lease_key,
1523        );
1524        assert!(
1525            adapter
1526                .acquire_lease_if_free()
1527                .await
1528                .expect("acquire should succeed"),
1529            "adapter should acquire lease"
1530        );
1531
1532        let block_a = poa_block_at_time(1, 10);
1533        let block_b = poa_block_at_time(1, 20);
1534        let block_a_data =
1535            postcard::to_allocvec(&block_a).expect("block a should serialize");
1536        let block_b_data =
1537            postcard::to_allocvec(&block_b).expect("block b should serialize");
1538
1539        let redis_a_client =
1540            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1541        let redis_b_client =
1542            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1543        let mut conn_a = redis_a_client
1544            .get_connection()
1545            .expect("redis a connection should open");
1546        let mut conn_b = redis_b_client
1547            .get_connection()
1548            .expect("redis b connection should open");
1549
1550        // Both at epoch 7 but different block data — each on 1 node (sub-quorum)
1551        append_stream_block(&mut conn_a, &stream_key, 1, &block_a_data, 7);
1552        append_stream_block(&mut conn_b, &stream_key, 1, &block_b_data, 7);
1553
1554        // when: leader reconciles — should pick one and repair to quorum
1555        // The repair writes to node C (empty), giving the winner 2/3
1556        let leader_state = adapter
1557            .leader_state(1.into())
1558            .await
1559            .expect("leader_state should succeed");
1560
1561        // then: one of the blocks is repaired and returned
1562        assert!(
1563            matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1564            "Expected repair to pick one block and reach quorum, got {leader_state:?}",
1565        );
1566    }
1567
1568    #[tokio::test(flavor = "multi_thread")]
1569    async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it()
1570     {
1571        // given: orphan block on only 1 of 3 nodes (below quorum)
1572        let redis_a = RedisTestServer::spawn();
1573        let redis_b = RedisTestServer::spawn();
1574        let redis_c = RedisTestServer::spawn();
1575        let lease_key = "poa:test:below-quorum".to_string();
1576        let stream_key = format!("{lease_key}:block:stream");
1577        let adapter = new_test_adapter(
1578            vec![
1579                redis_a.redis_url(),
1580                redis_b.redis_url(),
1581                redis_c.redis_url(),
1582            ],
1583            lease_key,
1584        );
1585        assert!(
1586            adapter
1587                .acquire_lease_if_free()
1588                .await
1589                .expect("acquire should succeed"),
1590            "adapter should acquire lease"
1591        );
1592
1593        let orphan_block = poa_block_at_time(1, 10);
1594        let orphan_block_data =
1595            postcard::to_allocvec(&orphan_block).expect("orphan block should serialize");
1596
1597        let redis_client =
1598            redis::Client::open(redis_a.redis_url()).expect("redis client should open");
1599        let mut conn = redis_client
1600            .get_connection()
1601            .expect("redis connection should open");
1602        append_stream_block(&mut conn, &stream_key, 1, &orphan_block_data, 1);
1603
1604        // when: leader reconciles — should repair the orphan to quorum
1605        let leader_state = adapter
1606            .leader_state(1.into())
1607            .await
1608            .expect("leader_state should succeed");
1609
1610        // then: orphan was reproposed to other nodes and returned for import
1611        assert!(
1612            matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1),
1613            "Expected sub-quorum entry to be repaired and returned, got {leader_state:?}"
1614        );
1615    }
1616
1617    #[tokio::test(flavor = "multi_thread")]
1618    async fn leader_state__when_contiguous_heights_have_quorum_then_repairs_sub_quorum_tail()
1619     {
1620        // given: h1, h2 on quorum (a,b). h3 below quorum (a only).
1621        let redis_a = RedisTestServer::spawn();
1622        let redis_b = RedisTestServer::spawn();
1623        let redis_c = RedisTestServer::spawn();
1624        let lease_key = "poa:test:contiguous-quorum".to_string();
1625        let stream_key = format!("{lease_key}:block:stream");
1626        let adapter = new_test_adapter(
1627            vec![
1628                redis_a.redis_url(),
1629                redis_b.redis_url(),
1630                redis_c.redis_url(),
1631            ],
1632            lease_key,
1633        );
1634        assert!(
1635            adapter
1636                .acquire_lease_if_free()
1637                .await
1638                .expect("acquire should succeed"),
1639            "adapter should acquire lease"
1640        );
1641
1642        let h1 = poa_block_at_time(1, 10);
1643        let h2 = poa_block_at_time(2, 20);
1644        let h3 = poa_block_at_time(3, 30);
1645        let h1_data = postcard::to_allocvec(&h1).expect("h1 should serialize");
1646        let h2_data = postcard::to_allocvec(&h2).expect("h2 should serialize");
1647        let h3_data = postcard::to_allocvec(&h3).expect("h3 should serialize");
1648
1649        let redis_a_client =
1650            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1651        let redis_b_client =
1652            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1653        let mut conn_a = redis_a_client
1654            .get_connection()
1655            .expect("redis a connection should open");
1656        let mut conn_b = redis_b_client
1657            .get_connection()
1658            .expect("redis b connection should open");
1659
1660        // h1 on quorum (a,b)
1661        append_stream_block(&mut conn_a, &stream_key, 1, &h1_data, 1);
1662        append_stream_block(&mut conn_b, &stream_key, 1, &h1_data, 1);
1663        // h2 on quorum (a,b)
1664        append_stream_block(&mut conn_a, &stream_key, 2, &h2_data, 1);
1665        append_stream_block(&mut conn_b, &stream_key, 2, &h2_data, 1);
1666        // h3 below quorum (a only)
1667        append_stream_block(&mut conn_a, &stream_key, 3, &h3_data, 1);
1668
1669        // when: leader reconciles — h3 should be repaired to quorum
1670        let leader_state = adapter
1671            .leader_state(1.into())
1672            .await
1673            .expect("leader_state should succeed");
1674
1675        // then: all 3 heights returned (h3 was repaired)
1676        let unreconciled_blocks = match leader_state {
1677            LeaderState::UnreconciledBlocks(blocks) => blocks,
1678            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1679        };
1680        assert_eq!(
1681            unreconciled_blocks
1682                .iter()
1683                .map(|b| u32::from(*b.entity.header().height()))
1684                .collect::<Vec<_>>(),
1685            vec![1, 2, 3],
1686            "Expected all heights including repaired sub-quorum h3",
1687        );
1688    }
1689
1690    #[tokio::test(flavor = "multi_thread")]
1691    async fn leader_state__when_contiguous_quorum_blocks_are_present_then_returns_all_available_contiguous_blocks()
1692     {
1693        // given
1694        let redis_a = RedisTestServer::spawn();
1695        let redis_b = RedisTestServer::spawn();
1696        let redis_c = RedisTestServer::spawn();
1697        let lease_key = "poa:test:contiguous-over-128".to_string();
1698        let stream_key = format!("{lease_key}:block:stream");
1699        let adapter = RedisLeaderLeaseAdapter::new(
1700            vec![
1701                redis_a.redis_url(),
1702                redis_b.redis_url(),
1703                redis_c.redis_url(),
1704            ],
1705            lease_key,
1706            Duration::from_secs(2),
1707            Duration::from_millis(100),
1708            Duration::from_millis(50),
1709            Duration::from_millis(0),
1710            1,
1711            1000,
1712        )
1713        .expect("adapter should be created");
1714        let redis_a_client =
1715            redis::Client::open(redis_a.redis_url()).expect("redis a client should open");
1716        let redis_b_client =
1717            redis::Client::open(redis_b.redis_url()).expect("redis b client should open");
1718        let redis_c_client =
1719            redis::Client::open(redis_c.redis_url()).expect("redis c client should open");
1720        let mut conn_a = redis_a_client
1721            .get_connection()
1722            .expect("redis a connection should open");
1723        let mut conn_b = redis_b_client
1724            .get_connection()
1725            .expect("redis b connection should open");
1726        let mut conn_c = redis_c_client
1727            .get_connection()
1728            .expect("redis c connection should open");
1729        let _ = &mut conn_c;
1730
1731        (1_u32..=129_u32).for_each(|height| {
1732            let block = poa_block_at_time(height, u64::from(height));
1733            let block_data =
1734                postcard::to_allocvec(&block).expect("block should serialize");
1735            append_stream_block(&mut conn_a, &stream_key, height, &block_data, 1);
1736            append_stream_block(&mut conn_b, &stream_key, height, &block_data, 1);
1737        });
1738
1739        // when
1740        let leader_state = adapter
1741            .leader_state(1.into())
1742            .await
1743            .expect("leader_state should succeed");
1744
1745        // then
1746        let unreconciled_blocks = match leader_state {
1747            LeaderState::UnreconciledBlocks(blocks) => blocks,
1748            other => panic!("Expected unreconciled blocks, got: {other:?}"),
1749        };
1750        assert_eq!(
1751            unreconciled_blocks.len(),
1752            129,
1753            "Expected all contiguous quorum-backed heights to reconcile in one call",
1754        );
1755    }
1756
1757    #[tokio::test(flavor = "multi_thread")]
1758    async fn publish_produced_block__when_fencing_token_is_uninitialized_then_returns_error()
1759     {
1760        // given
1761        let redis_a = RedisTestServer::spawn();
1762        let redis_b = RedisTestServer::spawn();
1763        let redis_c = RedisTestServer::spawn();
1764        let lease_key = "poa:test:missing-epoch".to_string();
1765        let redis_urls = vec![
1766            redis_a.redis_url(),
1767            redis_b.redis_url(),
1768            redis_c.redis_url(),
1769        ];
1770        let adapter = new_test_adapter(redis_urls, lease_key);
1771        let block = poa_block_at_time(1, 100);
1772
1773        // when
1774        let publish_result = adapter.publish_produced_block(&block);
1775
1776        // then
1777        assert!(
1778            publish_result.is_err(),
1779            "Publish should fail when fencing token is not initialized"
1780        );
1781    }
1782
1783    #[tokio::test(flavor = "multi_thread")]
1784    async fn release__when_adapter_is_not_lease_owner_then_returns_ok() {
1785        // given
1786        let redis_a = RedisTestServer::spawn();
1787        let redis_b = RedisTestServer::spawn();
1788        let redis_c = RedisTestServer::spawn();
1789        let lease_key = "poa:test:release-follower".to_string();
1790        let redis_urls = vec![
1791            redis_a.redis_url(),
1792            redis_b.redis_url(),
1793            redis_c.redis_url(),
1794        ];
1795        let adapter = new_test_adapter(redis_urls, lease_key);
1796
1797        // when
1798        let release_result = adapter.release().await;
1799
1800        // then
1801        assert!(
1802            release_result.is_ok(),
1803            "Release should be idempotent for adapters that do not own quorum lease"
1804        );
1805    }
1806
1807    #[tokio::test(flavor = "multi_thread")]
1808    async fn drop__when_non_last_clone_is_dropped_then_does_not_release_shared_lease() {
1809        // given
1810        let redis_a = RedisTestServer::spawn();
1811        let redis_b = RedisTestServer::spawn();
1812        let redis_c = RedisTestServer::spawn();
1813        let lease_key = "poa:test:drop-non-last-clone".to_string();
1814        let redis_urls = vec![
1815            redis_a.redis_url(),
1816            redis_b.redis_url(),
1817            redis_c.redis_url(),
1818        ];
1819        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
1820        assert!(
1821            adapter
1822                .acquire_lease_if_free()
1823                .await
1824                .expect("acquire should succeed"),
1825            "Adapter should acquire lease"
1826        );
1827        let adapter_clone = adapter.clone();
1828        let owner_token = adapter.lease_owner_token.clone();
1829
1830        // when
1831        drop(adapter_clone);
1832        sleep(Duration::from_millis(50)).await;
1833
1834        // then
1835        let owners = redis_urls
1836            .iter()
1837            .filter(|redis_url| {
1838                read_lease_owner(redis_url, &lease_key).as_deref()
1839                    == Some(owner_token.as_str())
1840            })
1841            .count();
1842        assert!(
1843            owners >= 2,
1844            "Dropping a non-last clone must not release quorum lease ownership"
1845        );
1846    }
1847
1848    #[tokio::test(flavor = "multi_thread")]
1849    async fn leader_state__when_lease_is_free_then_acquires_quorum_ownership() {
1850        // given
1851        let redis_a = RedisTestServer::spawn();
1852        let redis_b = RedisTestServer::spawn();
1853        let redis_c = RedisTestServer::spawn();
1854        let lease_key = "poa:test:acquire-on-leader-state".to_string();
1855        let redis_urls = vec![
1856            redis_a.redis_url(),
1857            redis_b.redis_url(),
1858            redis_c.redis_url(),
1859        ];
1860        let adapter = RedisLeaderLeaseAdapter::new(
1861            redis_urls.clone(),
1862            lease_key.clone(),
1863            Duration::from_millis(500),
1864            Duration::from_millis(100),
1865            Duration::from_millis(50),
1866            Duration::from_millis(0),
1867            1,
1868            1000,
1869        )
1870        .expect("adapter should be created");
1871
1872        // when
1873        let state = adapter
1874            .leader_state(1.into())
1875            .await
1876            .expect("leader_state should succeed");
1877        let owners = redis_urls
1878            .iter()
1879            .filter(|redis_url| {
1880                read_lease_owner(redis_url, &lease_key).as_deref()
1881                    == Some(adapter.lease_owner_token.as_str())
1882            })
1883            .count();
1884
1885        // then
1886        assert!(
1887            matches!(state, LeaderState::ReconciledLeader),
1888            "leader_state should acquire and report leader ownership when lease is free"
1889        );
1890        assert!(
1891            owners >= 2,
1892            "Lease ownership should be present on quorum after acquisition"
1893        );
1894    }
1895
1896    #[tokio::test(flavor = "multi_thread")]
1897    async fn leader_state__when_lease_expires_then_another_adapter_becomes_leader() {
1898        // given
1899        let redis_a = RedisTestServer::spawn();
1900        let redis_b = RedisTestServer::spawn();
1901        let redis_c = RedisTestServer::spawn();
1902        let lease_key = "poa:test:ttl-expiry-handoff".to_string();
1903        let redis_urls = vec![
1904            redis_a.redis_url(),
1905            redis_b.redis_url(),
1906            redis_c.redis_url(),
1907        ];
1908        let first_adapter = RedisLeaderLeaseAdapter::new(
1909            redis_urls.clone(),
1910            lease_key.clone(),
1911            Duration::from_millis(300),
1912            Duration::from_millis(100),
1913            Duration::from_millis(50),
1914            Duration::from_millis(0),
1915            1,
1916            1000,
1917        )
1918        .expect("first adapter should be created");
1919        let second_adapter = RedisLeaderLeaseAdapter::new(
1920            redis_urls.clone(),
1921            lease_key.clone(),
1922            Duration::from_millis(300),
1923            Duration::from_millis(100),
1924            Duration::from_millis(50),
1925            Duration::from_millis(0),
1926            1,
1927            1000,
1928        )
1929        .expect("second adapter should be created");
1930
1931        let first_state = first_adapter
1932            .leader_state(1.into())
1933            .await
1934            .expect("first leader_state should succeed");
1935        sleep(Duration::from_millis(900)).await;
1936
1937        // when
1938        let second_state = second_adapter
1939            .leader_state(1.into())
1940            .await
1941            .expect("second leader_state should succeed");
1942        let second_owner_count = redis_urls
1943            .iter()
1944            .filter(|redis_url| {
1945                read_lease_owner(redis_url, &lease_key).as_deref()
1946                    == Some(second_adapter.lease_owner_token.as_str())
1947            })
1948            .count();
1949
1950        // then
1951        assert!(
1952            matches!(first_state, LeaderState::ReconciledLeader),
1953            "First adapter should acquire lease initially"
1954        );
1955        assert!(
1956            matches!(second_state, LeaderState::ReconciledLeader),
1957            "Second adapter should become leader after TTL expiry"
1958        );
1959        assert!(
1960            second_owner_count >= 2,
1961            "Second adapter should own lease on quorum nodes after takeover"
1962        );
1963    }
1964
1965    #[tokio::test(flavor = "multi_thread")]
1966    async fn publish_produced_block__when_previous_leader_writes_after_handoff_then_rejects_zombie_write()
1967     {
1968        // given
1969        let redis_a = RedisTestServer::spawn();
1970        let redis_b = RedisTestServer::spawn();
1971        let redis_c = RedisTestServer::spawn();
1972        let lease_key = "poa:test:zombie-leader".to_string();
1973        let redis_urls = vec![
1974            redis_a.redis_url(),
1975            redis_b.redis_url(),
1976            redis_c.redis_url(),
1977        ];
1978        let old_leader = new_test_adapter(redis_urls.clone(), lease_key.clone());
1979        let current_leader = new_test_adapter(redis_urls, lease_key.clone());
1980        let block = poa_block_at_time(1, 111);
1981
1982        assert!(
1983            old_leader
1984                .acquire_lease_if_free()
1985                .await
1986                .expect("acquire should succeed"),
1987            "Old leader should acquire initial lease"
1988        );
1989        clear_lease_on_nodes(
1990            &[
1991                redis_a.redis_url(),
1992                redis_b.redis_url(),
1993                redis_c.redis_url(),
1994            ],
1995            &lease_key,
1996        );
1997        assert!(
1998            current_leader
1999                .acquire_lease_if_free()
2000                .await
2001                .expect("acquire should succeed"),
2002            "Current leader should acquire lease after handoff"
2003        );
2004
2005        // when
2006        let zombie_write = old_leader.publish_produced_block(&block);
2007
2008        // then
2009        assert!(
2010            zombie_write.is_err(),
2011            "Old leader write should be fenced after handoff"
2012        );
2013        let current_state = current_leader
2014            .leader_state(1.into())
2015            .await
2016            .expect("leader_state should succeed");
2017        assert!(
2018            matches!(current_state, LeaderState::ReconciledLeader),
2019            "Zombie partial writes must not be considered committed"
2020        );
2021    }
2022
2023    #[tokio::test(flavor = "multi_thread")]
2024    async fn publish_produced_block__when_epoch_is_behind_on_one_node_then_first_write_heals_epoch()
2025     {
2026        // given
2027        let redis_a = RedisTestServer::spawn();
2028        let redis_b = RedisTestServer::spawn();
2029        let redis_c = RedisTestServer::spawn();
2030        let lease_key = "poa:test:epoch-healing".to_string();
2031        let redis_urls = vec![
2032            redis_a.redis_url(),
2033            redis_b.redis_url(),
2034            redis_c.redis_url(),
2035        ];
2036        let adapter = new_test_adapter(redis_urls, lease_key.clone());
2037        let epoch_key = format!("{lease_key}:epoch:token");
2038        let block = poa_block_at_time(1, 222);
2039        assert!(
2040            adapter
2041                .acquire_lease_if_free()
2042                .await
2043                .expect("acquire should succeed"),
2044            "Adapter should acquire lease"
2045        );
2046        let leader_epoch = (*adapter.current_epoch_token.lock().expect("poisoned lock"))
2047            .expect("epoch should be initialized");
2048        let stale_epoch = leader_epoch.saturating_sub(1);
2049        set_epoch(&redis_a.redis_url(), &epoch_key, stale_epoch);
2050
2051        // when
2052        let publish_result = adapter.publish_produced_block(&block);
2053
2054        // then
2055        assert!(
2056            publish_result.is_ok(),
2057            "Publish should still succeed on quorum"
2058        );
2059        let healed_epoch = read_epoch(&redis_a.redis_url(), &epoch_key);
2060        assert_eq!(
2061            healed_epoch, leader_epoch,
2062            "First successful write should heal lagging epoch"
2063        );
2064    }
2065
2066    #[tokio::test(flavor = "multi_thread")]
2067    async fn publish_produced_block__when_write_succeeds_then_extends_lease_ttl() {
2068        // given
2069        let redis_a = RedisTestServer::spawn();
2070        let redis_b = RedisTestServer::spawn();
2071        let redis_c = RedisTestServer::spawn();
2072        let lease_key = "poa:test:publish-extends-lease-ttl".to_string();
2073        let redis_urls = vec![
2074            redis_a.redis_url(),
2075            redis_b.redis_url(),
2076            redis_c.redis_url(),
2077        ];
2078        let adapter = RedisLeaderLeaseAdapter::new(
2079            redis_urls.clone(),
2080            lease_key.clone(),
2081            Duration::from_millis(700),
2082            Duration::from_millis(100),
2083            Duration::from_millis(50),
2084            Duration::from_millis(0),
2085            1,
2086            1000,
2087        )
2088        .expect("adapter should be created");
2089        let block = poa_block_at_time(1, 444);
2090        assert!(
2091            adapter
2092                .acquire_lease_if_free()
2093                .await
2094                .expect("acquire should succeed"),
2095            "Adapter should acquire lease"
2096        );
2097        sleep(Duration::from_millis(500)).await;
2098
2099        // when
2100        let publish_result = adapter.publish_produced_block(&block);
2101        sleep(Duration::from_millis(400)).await;
2102        let owners = redis_urls
2103            .iter()
2104            .filter(|redis_url| {
2105                read_lease_owner(redis_url, &lease_key).as_deref()
2106                    == Some(adapter.lease_owner_token.as_str())
2107            })
2108            .count();
2109
2110        // then
2111        assert!(publish_result.is_ok(), "Publish should succeed on quorum");
2112        assert!(
2113            owners >= 2,
2114            "Successful write should extend lease TTL on quorum beyond original window"
2115        );
2116    }
2117
2118    #[tokio::test(flavor = "multi_thread")]
2119    async fn publish_produced_block__when_write_succeeds_on_less_than_quorum_then_entry_is_not_reconciled()
2120     {
2121        // given
2122        let redis_a = RedisTestServer::spawn();
2123        let redis_b = RedisTestServer::spawn();
2124        let redis_c = RedisTestServer::spawn();
2125        let lease_key = "poa:test:partial-write".to_string();
2126        let stream_key = format!("{lease_key}:block:stream");
2127        let redis_urls = vec![
2128            redis_a.redis_url(),
2129            redis_b.redis_url(),
2130            redis_c.redis_url(),
2131        ];
2132        let adapter = new_test_adapter(redis_urls, lease_key.clone());
2133        let block = poa_block_at_time(1, 333);
2134        assert!(
2135            adapter
2136                .acquire_lease_if_free()
2137                .await
2138                .expect("acquire should succeed"),
2139            "Adapter should acquire lease"
2140        );
2141        set_lease_owner(
2142            &redis_b.redis_url(),
2143            &lease_key,
2144            "other-owner",
2145            adapter.lease_ttl_millis,
2146        );
2147        set_lease_owner(
2148            &redis_c.redis_url(),
2149            &lease_key,
2150            "other-owner",
2151            adapter.lease_ttl_millis,
2152        );
2153
2154        // when
2155        let publish_result = adapter.publish_produced_block(&block);
2156        let unreconciled = adapter.unreconciled_blocks(1.into()).await;
2157
2158        // then
2159        assert!(
2160            publish_result.is_err(),
2161            "Publish must fail when fewer than quorum nodes accept write"
2162        );
2163        assert!(
2164            unreconciled.is_err(),
2165            "Unresolved backlog should return an error instead of empty result"
2166        );
2167        assert_eq!(
2168            stream_len(&redis_a.redis_url(), &stream_key),
2169            1,
2170            "One orphan entry should exist on the single successful node"
2171        );
2172    }
2173
2174    #[tokio::test(flavor = "multi_thread")]
2175    async fn unreconciled_blocks__when_quorum_latest_height_is_below_next_height_then_returns_empty()
2176     {
2177        // given
2178        let redis = RedisTestServer::spawn();
2179        let lease_key = "poa:test:cursor-fast-path".to_string();
2180        let stream_key = format!("{lease_key}:block:stream");
2181        let adapter = RedisLeaderLeaseAdapter::new(
2182            vec![redis.redis_url()],
2183            lease_key,
2184            Duration::from_secs(2),
2185            Duration::from_millis(100),
2186            Duration::from_millis(50),
2187            Duration::from_millis(0),
2188            1,
2189            1000,
2190        )
2191        .expect("adapter should be created");
2192        let block = poa_block_at_time(1, 10);
2193        let block_data = postcard::to_allocvec(&block).expect("serialize block");
2194        let redis_client =
2195            redis::Client::open(redis.redis_url()).expect("redis client should open");
2196        let mut conn = redis_client
2197            .get_connection()
2198            .expect("redis connection should open");
2199        append_stream_block(&mut conn, &stream_key, 1, &block_data, 1);
2200
2201        // when
2202        let blocks = adapter
2203            .unreconciled_blocks(2.into())
2204            .await
2205            .expect("reconciliation read should succeed");
2206
2207        // then
2208        assert!(
2209            blocks.is_empty(),
2210            "Expected fast path to skip full reconciliation"
2211        );
2212    }
2213
2214    #[tokio::test(flavor = "multi_thread")]
2215    async fn read_stream_entries_on_node__when_next_height_is_provided_then_reads_matching_entries()
2216     {
2217        // given
2218        let redis = RedisTestServer::spawn();
2219        let lease_key = "poa:test:cursor-incremental-read".to_string();
2220        let stream_key = format!("{lease_key}:block:stream");
2221        let adapter = RedisLeaderLeaseAdapter::new(
2222            vec![redis.redis_url()],
2223            lease_key,
2224            Duration::from_secs(2),
2225            Duration::from_millis(100),
2226            Duration::from_millis(50),
2227            Duration::from_millis(0),
2228            1,
2229            1000,
2230        )
2231        .expect("adapter should be created");
2232        let redis_client =
2233            redis::Client::open(redis.redis_url()).expect("redis client should open");
2234        let mut conn = redis_client
2235            .get_connection()
2236            .expect("redis connection should open");
2237        let h1 = poa_block_at_time(1, 10);
2238        let h2 = poa_block_at_time(2, 20);
2239        let h3 = poa_block_at_time(3, 30);
2240        let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2241        let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2242        let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2243        append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2244        append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2245        let redis_node = adapter.redis_nodes[0].clone();
2246
2247        // when
2248        let first_read = adapter
2249            .read_stream_entries_on_node(&redis_node, 1, 1000)
2250            .await
2251            .expect("first read should succeed");
2252        append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2253        let second_read = adapter
2254            .read_stream_entries_on_node(&redis_node, 3, 1000)
2255            .await
2256            .expect("second read should succeed");
2257
2258        // then
2259        assert_eq!(
2260            first_read.len(),
2261            2,
2262            "Expected initial read to include existing entries"
2263        );
2264        assert_eq!(
2265            second_read.len(),
2266            1,
2267            "Expected height-filtered read to include only matching entries"
2268        );
2269        assert_eq!(
2270            u32::from(*second_read[0].2.entity.header().height()),
2271            3,
2272            "Expected only the requested next height"
2273        );
2274    }
2275
2276    #[tokio::test(flavor = "multi_thread")]
2277    async fn read_stream_entries_on_node__when_max_entries_is_small_then_caps_results() {
2278        // given
2279        let redis = RedisTestServer::spawn();
2280        let lease_key = "poa:test:cursor-pagination".to_string();
2281        let stream_key = format!("{lease_key}:block:stream");
2282        let adapter = RedisLeaderLeaseAdapter::new(
2283            vec![redis.redis_url()],
2284            lease_key,
2285            Duration::from_secs(2),
2286            Duration::from_millis(100),
2287            Duration::from_millis(50),
2288            Duration::from_millis(0),
2289            1,
2290            1000,
2291        )
2292        .expect("adapter should be created");
2293        let redis_client =
2294            redis::Client::open(redis.redis_url()).expect("redis client should open");
2295        let mut conn = redis_client
2296            .get_connection()
2297            .expect("redis connection should open");
2298        let h1 = poa_block_at_time(1, 10);
2299        let h2 = poa_block_at_time(2, 20);
2300        let h3 = poa_block_at_time(3, 30);
2301        let h1_data = postcard::to_allocvec(&h1).expect("serialize block");
2302        let h2_data = postcard::to_allocvec(&h2).expect("serialize block");
2303        let h3_data = postcard::to_allocvec(&h3).expect("serialize block");
2304        append_stream_block(&mut conn, &stream_key, 1, &h1_data, 1);
2305        append_stream_block(&mut conn, &stream_key, 2, &h2_data, 1);
2306        append_stream_block(&mut conn, &stream_key, 3, &h3_data, 1);
2307        let redis_node = adapter.redis_nodes[0].clone();
2308
2309        // when
2310        let first_page = adapter
2311            .read_stream_entries_on_node(&redis_node, 1, 2)
2312            .await
2313            .expect("first page should succeed");
2314        let second_page = adapter
2315            .read_stream_entries_on_node(&redis_node, 3, 2)
2316            .await
2317            .expect("second page should succeed");
2318
2319        // then
2320        assert_eq!(first_page.len(), 2, "Expected first page to be capped");
2321        assert_eq!(
2322            u32::from(*first_page[0].2.entity.header().height()),
2323            1,
2324            "Expected first page to start from earliest height"
2325        );
2326        assert_eq!(
2327            u32::from(*first_page[1].2.entity.header().height()),
2328            2,
2329            "Expected first page to include second height"
2330        );
2331        assert_eq!(
2332            second_page.len(),
2333            1,
2334            "Expected height filter to return only matching trailing entry"
2335        );
2336        assert_eq!(
2337            u32::from(*second_page[0].2.entity.header().height()),
2338            3,
2339            "Expected second read to include only the requested next height"
2340        );
2341    }
2342
2343    /// When a partial publish leaves a stale entry at a given height,
2344    /// a subsequent write at the same height is rejected by
2345    /// write_block.lua's HEIGHT_EXISTS check. This prevents two blocks
2346    /// at the same height from coexisting in the stream, which would
2347    /// cause a fork if a different leader also achieved quorum at that
2348    /// height.
2349    #[tokio::test(flavor = "multi_thread")]
2350    async fn partial_publish_then_retry_at_same_height__new_leader_reconciles_stale_block()
2351     {
2352        let redis = RedisTestServer::spawn();
2353        let lease_key = "poa:test:fork-repro".to_string();
2354        let stream_key = format!("{lease_key}:block:stream");
2355
2356        let adapter_a = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2357        assert!(
2358            adapter_a
2359                .acquire_lease_if_free()
2360                .await
2361                .expect("acquire should succeed"),
2362            "adapter_a should acquire lease"
2363        );
2364        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2365            .expect("epoch should be set");
2366
2367        // Simulate stale partial publish by writing directly to stream
2368        let block_a = poa_block_at_time(1, 100);
2369        let block_a_data = postcard::to_allocvec(&block_a).expect("serialize block_a");
2370        let redis_client =
2371            redis::Client::open(redis.redis_url()).expect("redis client should open");
2372        let mut conn = redis_client
2373            .get_connection()
2374            .expect("redis connection should open");
2375        append_stream_block(&mut conn, &stream_key, 1, &block_a_data, epoch_a as u32);
2376
2377        // Leader A retries with a different block at the same height —
2378        // this should FAIL because height 1 already exists in the stream.
2379        let block_b = poa_block_at_time(1, 999);
2380        assert_ne!(
2381            block_a.entity.header().time(),
2382            block_b.entity.header().time()
2383        );
2384
2385        let result = adapter_a.publish_produced_block(&block_b);
2386        assert!(
2387            result.is_err(),
2388            "publish at same height should fail due to HEIGHT_EXISTS"
2389        );
2390
2391        // Stream should still have only the original entry
2392        assert_eq!(stream_len(&redis.redis_url(), &stream_key), 1);
2393
2394        adapter_a.release().await.expect("release should succeed");
2395
2396        // A new leader reconciles and sees block_a (the only entry)
2397        let adapter_b = new_test_adapter(vec![redis.redis_url()], lease_key.clone());
2398        assert!(
2399            adapter_b
2400                .acquire_lease_if_free()
2401                .await
2402                .expect("acquire should succeed"),
2403            "adapter_b should acquire lease"
2404        );
2405
2406        let unreconciled = adapter_b
2407            .unreconciled_blocks(1.into())
2408            .await
2409            .expect("reconciliation should succeed");
2410
2411        assert_eq!(unreconciled.len(), 1, "Should reconcile exactly one block");
2412        assert_eq!(
2413            unreconciled[0].entity.header().time(),
2414            block_a.entity.header().time(),
2415            "Reconciliation should return block_a (the only entry in the stream)"
2416        );
2417    }
2418
2419    struct RedisTestServer {
2420        child: Option<Child>,
2421        port: u16,
2422        redis_url: String,
2423    }
2424
2425    impl RedisTestServer {
2426        fn spawn() -> Self {
2427            let mut server = Self::new_stopped();
2428            server.start();
2429            server
2430        }
2431
2432        fn new_stopped() -> Self {
2433            let port = bind_unused_port();
2434            Self {
2435                child: None,
2436                port,
2437                redis_url: format!("redis://127.0.0.1:{port}/"),
2438            }
2439        }
2440
2441        fn start(&mut self) {
2442            if self.child.is_some() {
2443                return;
2444            }
2445            let child = spawn_redis_server(self.port);
2446            wait_for_redis_ready(self.port);
2447            self.child = Some(child);
2448        }
2449
2450        fn stop(&mut self) {
2451            if let Some(child) = self.child.as_mut() {
2452                let _ = child.kill();
2453                let _ = child.wait();
2454            }
2455            self.child = None;
2456        }
2457
2458        fn redis_url(&self) -> String {
2459            self.redis_url.clone()
2460        }
2461    }
2462
2463    impl Drop for RedisTestServer {
2464        fn drop(&mut self) {
2465            if let Some(child) = self.child.as_mut() {
2466                let _ = child.kill();
2467                let _ = child.wait();
2468            }
2469        }
2470    }
2471
2472    fn bind_unused_port() -> u16 {
2473        let socket =
2474            TcpListener::bind(SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
2475                .expect("Should bind an ephemeral port");
2476        let port = socket.local_addr().expect("Should get local addr").port();
2477        drop(socket);
2478        port
2479    }
2480
2481    fn spawn_redis_server(port: u16) -> Child {
2482        Command::new("redis-server")
2483            .arg("--port")
2484            .arg(port.to_string())
2485            .arg("--save")
2486            .arg("")
2487            .arg("--appendonly")
2488            .arg("no")
2489            .arg("--bind")
2490            .arg("127.0.0.1")
2491            .stdout(Stdio::null())
2492            .stderr(Stdio::null())
2493            .spawn()
2494            .expect("Failed to spawn redis-server")
2495    }
2496
2497    fn wait_for_redis_ready(port: u16) {
2498        let addr = SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, port);
2499        let start = std::time::Instant::now();
2500        let timeout = Duration::from_secs(5);
2501        while start.elapsed() < timeout {
2502            if TcpStream::connect(addr).is_ok() {
2503                return;
2504            }
2505            thread::sleep(Duration::from_millis(10));
2506        }
2507        panic!("Redis server did not become ready on port {port}");
2508    }
2509
2510    fn poa_block_at_time(height: u32, timestamp: u64) -> SealedBlock {
2511        let mut block = Block::default();
2512        block.header_mut().set_block_height(height.into());
2513        block
2514            .header_mut()
2515            .set_time(fuel_core_types::tai64::Tai64(timestamp));
2516        block.header_mut().recalculate_metadata();
2517        SealedBlock {
2518            entity: block,
2519            consensus: Consensus::PoA(Default::default()),
2520        }
2521    }
2522
2523    fn append_stream_block(
2524        conn: &mut redis::Connection,
2525        stream_key: &str,
2526        height: u32,
2527        data: &[u8],
2528        epoch: u32,
2529    ) {
2530        let _: String = redis::cmd("XADD")
2531            .arg(stream_key)
2532            .arg("*")
2533            .arg("height")
2534            .arg(height)
2535            .arg("data")
2536            .arg(data)
2537            .arg("epoch")
2538            .arg(epoch)
2539            .arg("timestamp")
2540            .arg(epoch)
2541            .query(conn)
2542            .expect("stream write should succeed");
2543    }
2544
2545    fn new_test_adapter(
2546        redis_urls: Vec<String>,
2547        lease_key: String,
2548    ) -> RedisLeaderLeaseAdapter {
2549        RedisLeaderLeaseAdapter::new(
2550            redis_urls,
2551            lease_key,
2552            Duration::from_secs(2),
2553            Duration::from_millis(100),
2554            Duration::from_millis(50),
2555            Duration::from_millis(0),
2556            1,
2557            1000,
2558        )
2559        .expect("adapter should be created")
2560    }
2561
2562    fn set_epoch(redis_url: &str, epoch_key: &str, epoch: u64) {
2563        let redis_client =
2564            redis::Client::open(redis_url).expect("redis client should open");
2565        let mut conn = redis_client
2566            .get_connection()
2567            .expect("redis connection should open");
2568        let _: () = redis::cmd("SET")
2569            .arg(epoch_key)
2570            .arg(epoch)
2571            .query(&mut conn)
2572            .expect("epoch set should succeed");
2573    }
2574
2575    fn read_epoch(redis_url: &str, epoch_key: &str) -> u64 {
2576        let redis_client =
2577            redis::Client::open(redis_url).expect("redis client should open");
2578        let mut conn = redis_client
2579            .get_connection()
2580            .expect("redis connection should open");
2581        let epoch: Option<u64> = redis::cmd("GET")
2582            .arg(epoch_key)
2583            .query(&mut conn)
2584            .expect("epoch get should succeed");
2585        epoch.expect("epoch should exist")
2586    }
2587
2588    fn set_lease_owner(
2589        redis_url: &str,
2590        lease_key: &str,
2591        owner: &str,
2592        lease_ttl_millis: u64,
2593    ) {
2594        let redis_client =
2595            redis::Client::open(redis_url).expect("redis client should open");
2596        let mut conn = redis_client
2597            .get_connection()
2598            .expect("redis connection should open");
2599        let _: () = redis::cmd("SET")
2600            .arg(lease_key)
2601            .arg(owner)
2602            .arg("PX")
2603            .arg(lease_ttl_millis)
2604            .query(&mut conn)
2605            .expect("lease owner set should succeed");
2606    }
2607
2608    fn read_lease_owner(redis_url: &str, lease_key: &str) -> Option<String> {
2609        let redis_client =
2610            redis::Client::open(redis_url).expect("redis client should open");
2611        let mut conn = redis_client
2612            .get_connection()
2613            .expect("redis connection should open");
2614        redis::cmd("GET")
2615            .arg(lease_key)
2616            .query(&mut conn)
2617            .expect("lease owner get should succeed")
2618    }
2619
2620    fn clear_lease_on_nodes(redis_urls: &[String], lease_key: &str) {
2621        redis_urls.iter().for_each(|redis_url| {
2622            let redis_client = redis::Client::open(redis_url.as_str())
2623                .expect("redis client should open");
2624            let mut conn = redis_client
2625                .get_connection()
2626                .expect("redis connection should open");
2627            let _: () = redis::cmd("DEL")
2628                .arg(lease_key)
2629                .query(&mut conn)
2630                .expect("lease delete should succeed");
2631        });
2632    }
2633
2634    fn stream_len(redis_url: &str, stream_key: &str) -> usize {
2635        let redis_client =
2636            redis::Client::open(redis_url).expect("redis client should open");
2637        let mut conn = redis_client
2638            .get_connection()
2639            .expect("redis connection should open");
2640        redis::cmd("XLEN")
2641            .arg(stream_key)
2642            .query(&mut conn)
2643            .expect("stream length query should succeed")
2644    }
2645
2646    /// When Redis read calls fail on a quorum of nodes,
2647    /// `unreconciled_blocks` must return an error — not silently
2648    /// return an empty list that would let the caller produce
2649    /// a divergent block.
2650    #[tokio::test(flavor = "multi_thread")]
2651    async fn unreconciled_blocks__when_reads_fail_on_quorum_nodes__returns_error() {
2652        // given — 3 Redis nodes, leader A publishes block to all 3
2653        let mut redis_a = RedisTestServer::spawn();
2654        let mut redis_b = RedisTestServer::spawn();
2655        let redis_c = RedisTestServer::spawn();
2656        let lease_key = "poa:test:read-failure-fork".to_string();
2657        let redis_urls = vec![
2658            redis_a.redis_url(),
2659            redis_b.redis_url(),
2660            redis_c.redis_url(),
2661        ];
2662
2663        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2664        assert!(
2665            adapter_a
2666                .acquire_lease_if_free()
2667                .await
2668                .expect("acquire should succeed"),
2669            "Leader A should acquire lease"
2670        );
2671
2672        let block = poa_block_at_time(1, 100);
2673        adapter_a
2674            .publish_produced_block(&block)
2675            .expect("publish should succeed on all 3 nodes");
2676
2677        // Verify block exists on all 3 nodes
2678        let stream_key = format!("{lease_key}:block:stream");
2679        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2680        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2681        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
2682
2683        // Simulate A releasing lease
2684        adapter_a.release().await.expect("release should succeed");
2685
2686        // when — kill 2 of 3 Redis nodes BEFORE new leader reconciles
2687        redis_a.stop();
2688        redis_b.stop();
2689
2690        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2691        // Manually set epoch so we can call unreconciled_blocks directly
2692        {
2693            let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
2694            *epoch = Some(99);
2695        }
2696
2697        let result = adapter_b.unreconciled_blocks(1.into()).await;
2698
2699        // then — must return Err, not Ok([])
2700        assert!(
2701            result.is_err(),
2702            "unreconciled_blocks must return error when reads fail on quorum of nodes"
2703        );
2704    }
2705
2706    /// Proves that when a Redis node restarts (losing all in-memory data),
2707    /// a block that was published to exactly quorum nodes drops below quorum
2708    /// and reconciliation cannot find it — enabling a fork.
2709    #[tokio::test(flavor = "multi_thread")]
2710    async fn unreconciled_blocks__when_redis_node_restarts_and_loses_data__drops_block_below_quorum()
2711     {
2712        // given — 3 Redis nodes, leader A publishes block to nodes A and B only
2713        // (simulating a partial publish where node C timed out)
2714        let redis_a = RedisTestServer::spawn();
2715        let mut redis_b = RedisTestServer::spawn();
2716        let redis_c = RedisTestServer::spawn();
2717        let lease_key = "poa:test:data-loss-fork".to_string();
2718        let stream_key = format!("{lease_key}:block:stream");
2719        let redis_urls = vec![
2720            redis_a.redis_url(),
2721            redis_b.redis_url(),
2722            redis_c.redis_url(),
2723        ];
2724
2725        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2726        assert!(
2727            adapter_a
2728                .acquire_lease_if_free()
2729                .await
2730                .expect("acquire should succeed"),
2731            "Leader A should acquire lease"
2732        );
2733        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2734            .expect("epoch should be set");
2735
2736        // Publish block to nodes A and B only (simulating node C timeout).
2737        // We write directly to simulate the partial publish that still
2738        // reaches quorum (2/3).
2739        let block = poa_block_at_time(1, 100);
2740        let block_data = postcard::to_allocvec(&block).expect("serialize");
2741
2742        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
2743        let mut conn_a = client_a.get_connection().expect("conn");
2744        let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
2745        let mut conn_b = client_b.get_connection().expect("conn");
2746
2747        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, epoch_a as u32);
2748        append_stream_block(&mut conn_b, &stream_key, 1, &block_data, epoch_a as u32);
2749        // Node C has no entry (simulated timeout during publish)
2750
2751        // Verify: block on A and B, not on C
2752        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2753        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2754        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 0);
2755
2756        // Confirm reconciliation works BEFORE data loss — quorum=2, both A and B have it
2757        let pre_loss = adapter_a
2758            .unreconciled_blocks(1.into())
2759            .await
2760            .expect("reconciliation should succeed");
2761        assert_eq!(
2762            pre_loss.len(),
2763            1,
2764            "Block should be reconcilable with 2/3 nodes having it"
2765        );
2766
2767        // when — Redis node B restarts (pod eviction / rolling deploy / AMI drift)
2768        // All in-memory data is lost (no persistence configured)
2769        drop(conn_b);
2770        drop(client_b);
2771        redis_b.stop();
2772        redis_b.start();
2773
2774        // Verify node B lost its stream data
2775        assert_eq!(
2776            stream_len(&redis_b.redis_url(), &stream_key),
2777            0,
2778            "Restarted node should have empty stream"
2779        );
2780
2781        // Release A's lease so B can acquire
2782        adapter_a.release().await.expect("release should succeed");
2783
2784        // New leader acquires
2785        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2786        assert!(
2787            adapter_b
2788                .acquire_lease_if_free()
2789                .await
2790                .expect("acquire should succeed"),
2791            "New leader should acquire lease"
2792        );
2793
2794        let post_loss = adapter_b
2795            .unreconciled_blocks(1.into())
2796            .await
2797            .expect("reconciliation should succeed");
2798
2799        // then — repair reproposed the block from node A to node B (now empty)
2800        // and node C, reaching quorum again. The block is recovered.
2801        assert_eq!(
2802            post_loss.len(),
2803            1,
2804            "Repair should recover the block by reproposing from node A to the other nodes"
2805        );
2806    }
2807
2808    /// After an election storm where leader A wins on nodes 1,2 but
2809    /// another candidate held node 3, `has_lease_owner_quorum` should
2810    /// expand the lock to node 3 once it's free. Subsequent block
2811    /// writes then go to all 3 nodes instead of just 2.
2812    #[tokio::test(flavor = "multi_thread")]
2813    async fn has_lease_owner_quorum__expands_lock_to_non_owned_nodes() {
2814        // given — 3 Redis nodes
2815        let redis_a = RedisTestServer::spawn();
2816        let redis_b = RedisTestServer::spawn();
2817        let redis_c = RedisTestServer::spawn();
2818        let lease_key = "poa:test:lock-expansion".to_string();
2819        let stream_key = format!("{lease_key}:block:stream");
2820        let redis_urls = vec![
2821            redis_a.redis_url(),
2822            redis_b.redis_url(),
2823            redis_c.redis_url(),
2824        ];
2825
2826        // Simulate election storm: candidate B grabs node C first
2827        let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2828        // Manually acquire on node C only (simulate B winning SET NX on C)
2829        {
2830            let client = redis::Client::open(redis_c.redis_url()).expect("client");
2831            let mut conn = client.get_connection().expect("conn");
2832            let _: () = redis::cmd("SET")
2833                .arg(&lease_key)
2834                .arg(&candidate_b.lease_owner_token)
2835                .arg("PX")
2836                .arg(5000u64)
2837                .query(&mut conn)
2838                .expect("set should succeed");
2839        }
2840
2841        // Leader A acquires — gets nodes A,B but not C (B holds it)
2842        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2843        assert!(
2844            adapter_a
2845                .acquire_lease_if_free()
2846                .await
2847                .expect("acquire should succeed"),
2848            "Leader A should acquire quorum (2/3)"
2849        );
2850
2851        // Verify A owns nodes A,B but NOT node C
2852        let owns_a = read_lease_owner(&redis_a.redis_url(), &lease_key)
2853            == Some(adapter_a.lease_owner_token.clone());
2854        let owns_b = read_lease_owner(&redis_b.redis_url(), &lease_key)
2855            == Some(adapter_a.lease_owner_token.clone());
2856        let owns_c = read_lease_owner(&redis_c.redis_url(), &lease_key)
2857            == Some(adapter_a.lease_owner_token.clone());
2858        assert!(owns_a && owns_b, "A should own nodes A and B");
2859        assert!(!owns_c, "A should NOT own node C (held by B)");
2860
2861        // Candidate B releases node C (simulating failed-quorum cleanup)
2862        clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
2863        assert!(
2864            read_lease_owner(&redis_c.redis_url(), &lease_key).is_none(),
2865            "Node C should be free after B releases"
2866        );
2867
2868        // when — A calls has_lease_owner_quorum (which now expands)
2869        let has_quorum = adapter_a
2870            .has_lease_owner_quorum()
2871            .await
2872            .expect("quorum check should succeed");
2873        assert!(has_quorum, "A should still have quorum");
2874
2875        // then — A should now own node C too
2876        let owns_c_after = read_lease_owner(&redis_c.redis_url(), &lease_key)
2877            == Some(adapter_a.lease_owner_token.clone());
2878        assert!(owns_c_after, "Lock expansion should have acquired node C");
2879
2880        // Verify writes now go to all 3 nodes
2881        let block = poa_block_at_time(1, 100);
2882        adapter_a
2883            .publish_produced_block(&block)
2884            .expect("publish should succeed");
2885
2886        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2887        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2888        assert_eq!(
2889            stream_len(&redis_c.redis_url(), &stream_key),
2890            1,
2891            "Block should be written to expanded node C"
2892        );
2893    }
2894
2895    /// When lock expansion acquires a node with a higher epoch
2896    /// (from election storm drift), the leader adopts the higher epoch
2897    /// so write_block.lua succeeds on all nodes.
2898    #[tokio::test(flavor = "multi_thread")]
2899    async fn has_lease_owner_quorum__adopts_higher_epoch_from_expanded_node() {
2900        // given — 3 Redis nodes
2901        let redis_a = RedisTestServer::spawn();
2902        let redis_b = RedisTestServer::spawn();
2903        let redis_c = RedisTestServer::spawn();
2904        let lease_key = "poa:test:epoch-adoption".to_string();
2905        let epoch_key = format!("{lease_key}:epoch:token");
2906        let stream_key = format!("{lease_key}:block:stream");
2907        let redis_urls = vec![
2908            redis_a.redis_url(),
2909            redis_b.redis_url(),
2910            redis_c.redis_url(),
2911        ];
2912
2913        // Simulate election storm: B promotes on node C (incrementing epoch)
2914        // then fails quorum and releases the lock, leaving epoch drifted
2915        let candidate_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
2916        {
2917            let client = redis::Client::open(redis_c.redis_url()).expect("client");
2918            let mut conn = client.get_connection().expect("conn");
2919            // Simulate B's promote_leader.lua on node C: SET NX + INCR
2920            let _: () = redis::cmd("SET")
2921                .arg(&lease_key)
2922                .arg(&candidate_b.lease_owner_token)
2923                .arg("PX")
2924                .arg(5000u64)
2925                .query(&mut conn)
2926                .expect("set should succeed");
2927            let _: u64 = redis::cmd("INCR")
2928                .arg(&epoch_key)
2929                .query(&mut conn)
2930                .expect("incr should succeed");
2931            // B releases (failed quorum cleanup)
2932        }
2933        clear_lease_on_nodes(&[redis_c.redis_url()], &lease_key);
2934
2935        // Node C now has epoch=1 but no lock owner
2936        let epoch_c_before = read_epoch(&redis_c.redis_url(), &epoch_key);
2937
2938        // Leader A acquires on all free nodes (A,B,C all free now)
2939        let adapter_a = new_test_adapter(redis_urls.clone(), lease_key.clone());
2940        assert!(
2941            adapter_a
2942                .acquire_lease_if_free()
2943                .await
2944                .expect("acquire should succeed"),
2945        );
2946        let epoch_a = (*adapter_a.current_epoch_token.lock().expect("lock"))
2947            .expect("epoch should be set");
2948
2949        // A's epoch should be max across all 3 nodes
2950        // Node C had epoch=1 from B's INCR, then A's promote INCR'd it to 2
2951        // Nodes A,B were at 0, A's promote INCR'd them to 1
2952        // A takes max(1, 1, 2) = 2
2953        assert!(
2954            epoch_a > epoch_c_before,
2955            "Leader's epoch ({epoch_a}) should be > node C's pre-acquisition epoch ({epoch_c_before})"
2956        );
2957
2958        // Verify writes succeed on ALL nodes with the adopted epoch
2959        let block = poa_block_at_time(1, 100);
2960        adapter_a
2961            .publish_produced_block(&block)
2962            .expect("publish should succeed on all 3 nodes");
2963
2964        assert_eq!(stream_len(&redis_a.redis_url(), &stream_key), 1);
2965        assert_eq!(stream_len(&redis_b.redis_url(), &stream_key), 1);
2966        assert_eq!(stream_len(&redis_c.redis_url(), &stream_key), 1);
2967    }
2968
2969    /// Exercises promotion, block write, fencing rejection, repair, and
2970    /// reconciliation, then dumps `encode_metrics()` to verify all PoA
2971    /// metrics appear on the /v1/metrics endpoint with expected values.
2972    #[tokio::test(flavor = "multi_thread")]
2973    async fn metrics__poa_metrics_appear_in_encoded_output_after_exercising_all_paths() {
2974        // --- setup: 3 Redis nodes ---
2975        let redis_a = RedisTestServer::spawn();
2976        let redis_b = RedisTestServer::spawn();
2977        let redis_c = RedisTestServer::spawn();
2978        let lease_key = "poa:test:metrics-smoke".to_string();
2979        let stream_key = format!("{lease_key}:block:stream");
2980        let redis_urls = vec![
2981            redis_a.redis_url(),
2982            redis_b.redis_url(),
2983            redis_c.redis_url(),
2984        ];
2985
2986        // 1. Promotion (success path)
2987        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
2988        assert!(
2989            adapter
2990                .acquire_lease_if_free()
2991                .await
2992                .expect("acquire should succeed"),
2993            "adapter should acquire lease"
2994        );
2995
2996        // 2. Successful block write
2997        let block1 = poa_block_at_time(1, 100);
2998        adapter
2999            .publish_produced_block(&block1)
3000            .expect("publish should succeed");
3001
3002        // 3. HEIGHT_EXISTS — write same height again
3003        let block1_dup = poa_block_at_time(1, 200);
3004        let dup_result = adapter.publish_produced_block(&block1_dup);
3005        assert!(dup_result.is_err(), "duplicate height should fail");
3006
3007        // 4. Fencing rejection — old leader tries to write after handoff
3008        let old_adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3009        // Give old_adapter a stale epoch so it thinks it's leader
3010        {
3011            let mut epoch = old_adapter.current_epoch_token.lock().expect("lock");
3012            *epoch = Some(1);
3013        }
3014        let _zombie = old_adapter.publish_produced_block(&poa_block_at_time(2, 300));
3015
3016        // 5. Reconciliation with sub-quorum repair
3017        //    Put an orphan block on node A only at height 2
3018        let orphan = poa_block_at_time(2, 400);
3019        let orphan_data = postcard::to_allocvec(&orphan).expect("serialize orphan");
3020        let client_a = redis::Client::open(redis_a.redis_url()).expect("redis client");
3021        let mut conn_a = client_a.get_connection().expect("redis connection");
3022        let epoch_val =
3023            (*adapter.current_epoch_token.lock().expect("lock")).expect("epoch set");
3024        #[allow(clippy::cast_possible_truncation)]
3025        let epoch_u32 = epoch_val as u32;
3026        append_stream_block(&mut conn_a, &stream_key, 2, &orphan_data, epoch_u32);
3027
3028        // 6. leader_state triggers reconciliation + repair
3029        let state = adapter
3030            .leader_state(2.into())
3031            .await
3032            .expect("leader_state should succeed");
3033        assert!(
3034            matches!(
3035                state,
3036                LeaderState::UnreconciledBlocks(ref blocks) if !blocks.is_empty()
3037            ),
3038            "Should have unreconciled blocks: {state:?}"
3039        );
3040
3041        // --- encode and verify ---
3042        let encoded =
3043            fuel_core_metrics::encode_metrics().expect("encode_metrics should succeed");
3044
3045        // Print full output for visual inspection
3046        let poa_lines: Vec<&str> =
3047            encoded.lines().filter(|l| l.contains("poa_")).collect();
3048        for line in &poa_lines {
3049            eprintln!("{line}");
3050        }
3051
3052        // Verify all metric names appear.
3053        // Counters get `_total` appended by prometheus-client automatically.
3054        let expected_names = [
3055            "poa_leader_epoch",
3056            "poa_is_leader",
3057            "poa_epoch_max_drift",
3058            "poa_stream_trim_headroom",
3059            "poa_write_block_success_total",
3060            "poa_write_block_height_exists_total",
3061            "poa_write_block_fencing_error_total",
3062            "poa_write_block_error_total",
3063            "poa_repair_success_total",
3064            "poa_promotion_success_total",
3065            "poa_promotion_duration_s",
3066            "poa_write_block_duration_s",
3067            "poa_reconciliation_duration_s",
3068            "poa_connection_reset_total",
3069        ];
3070        for name in &expected_names {
3071            assert!(
3072                encoded.contains(name),
3073                "Metric '{name}' missing from /v1/metrics output"
3074            );
3075        }
3076
3077        // Verify key metrics have non-zero values.
3078        // For counters, the data line is e.g. `poa_write_block_success_total 3`.
3079        // For gauges, it's e.g. `poa_leader_epoch 2`.
3080        // We find the line that starts with the name, excluding sub-metric
3081        // lines (like `_bucket`, `_sum`, `_count`).
3082        let non_zero_metrics = [
3083            "poa_leader_epoch",
3084            "poa_is_leader",
3085            "poa_write_block_success_total",
3086            "poa_promotion_success_total",
3087            "poa_repair_success_total",
3088        ];
3089        for name in &non_zero_metrics {
3090            let metric_line = encoded
3091                .lines()
3092                .find(|l| {
3093                    l.starts_with(name)
3094                        && !l.starts_with(&format!("{name}_"))
3095                        && !l.starts_with('#')
3096                })
3097                .unwrap_or_else(|| panic!("No data line for {name}"));
3098            assert!(
3099                !metric_line.ends_with(" 0"),
3100                "Metric '{name}' should be non-zero, got: {metric_line}"
3101            );
3102        }
3103    }
3104
3105    /// When quorum reads fail during reconciliation, a subsequent call
3106    /// should still be able to read the same backlog entries.
3107    #[tokio::test(flavor = "multi_thread")]
3108    async fn unreconciled_blocks__after_quorum_read_failure_then_backlog_remains_readable()
3109     {
3110        // given — 3 Redis nodes, block published to all 3
3111        let mut redis_a = RedisTestServer::spawn();
3112        let mut redis_b = RedisTestServer::spawn();
3113        let redis_c = RedisTestServer::spawn();
3114        let lease_key = "poa:test:cursor-restore-quorum".to_string();
3115        let stream_key = format!("{lease_key}:block:stream");
3116        let redis_urls = vec![
3117            redis_a.redis_url(),
3118            redis_b.redis_url(),
3119            redis_c.redis_url(),
3120        ];
3121
3122        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3123        assert!(
3124            adapter
3125                .acquire_lease_if_free()
3126                .await
3127                .expect("acquire should succeed"),
3128            "Should acquire lease"
3129        );
3130
3131        let block = poa_block_at_time(1, 100);
3132        adapter
3133            .publish_produced_block(&block)
3134            .expect("publish should succeed on all 3 nodes");
3135        adapter.release().await.expect("release should succeed");
3136
3137        // when — kill 2 nodes so quorum read fails
3138        redis_a.stop();
3139        redis_b.stop();
3140
3141        let adapter_b = new_test_adapter(redis_urls.clone(), lease_key.clone());
3142        {
3143            let mut epoch = adapter_b.current_epoch_token.lock().expect("lock");
3144            *epoch = Some(99);
3145        }
3146        let result = adapter_b.unreconciled_blocks(1.into()).await;
3147        assert!(result.is_err(), "Should fail with quorum read failure");
3148
3149        // Restart the killed nodes — all 3 now reachable
3150        redis_a.start();
3151        redis_b.start();
3152
3153        // Re-publish block to the restarted nodes so they have data
3154        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3155        let mut conn_a = client_a.get_connection().expect("conn");
3156        let client_b = redis::Client::open(redis_b.redis_url()).expect("client");
3157        let mut conn_b = client_b.get_connection().expect("conn");
3158        let block_data = postcard::to_allocvec(&block).expect("serialize");
3159        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3160        append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 1);
3161
3162        // then — subsequent call must still see the block on node C
3163        let blocks = adapter_b
3164            .unreconciled_blocks(1.into())
3165            .await
3166            .expect("reconciliation should succeed now");
3167        assert_eq!(
3168            blocks.len(),
3169            1,
3170            "Quorum read failure should not make backlog entries unreadable"
3171        );
3172    }
3173
3174    /// When sub-quorum repair fails, the next reconciliation round
3175    /// should still be able to re-read and retry.
3176    #[tokio::test(flavor = "multi_thread")]
3177    async fn unreconciled_blocks__after_repair_failure_then_backlog_remains_readable() {
3178        // given — 3 Redis nodes, block published to only 1 node (sub-quorum)
3179        let redis_a = RedisTestServer::spawn();
3180        let redis_b = RedisTestServer::spawn();
3181        let redis_c = RedisTestServer::spawn();
3182        let lease_key = "poa:test:cursor-restore-repair".to_string();
3183        let stream_key = format!("{lease_key}:block:stream");
3184        let redis_urls = vec![
3185            redis_a.redis_url(),
3186            redis_b.redis_url(),
3187            redis_c.redis_url(),
3188        ];
3189
3190        let block = poa_block_at_time(1, 100);
3191        let block_data = postcard::to_allocvec(&block).expect("serialize");
3192
3193        // Write block to only node A — sub-quorum (1/3)
3194        let client_a = redis::Client::open(redis_a.redis_url()).expect("client");
3195        let mut conn_a = client_a.get_connection().expect("conn");
3196        append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 1);
3197
3198        // Adapter without a lease — repair will fail (no lock held)
3199        let adapter = new_test_adapter(redis_urls.clone(), lease_key.clone());
3200        // Set epoch but do NOT acquire lease — repair_sub_quorum_block
3201        // will get FencingRejected or fail to reach quorum
3202        {
3203            let mut epoch = adapter.current_epoch_token.lock().expect("lock");
3204            *epoch = Some(99);
3205        }
3206
3207        // First call reads entries, then repair fails because we don't hold the lock.
3208        let result = adapter.unreconciled_blocks(1.into()).await;
3209        // Repair failure now returns an error because backlog remains unresolved.
3210        assert!(
3211            result.is_err(),
3212            "Should return error when repair fails and backlog remains unresolved"
3213        );
3214
3215        // Now acquire the lease so repair can succeed
3216        assert!(
3217            adapter
3218                .acquire_lease_if_free()
3219                .await
3220                .expect("acquire should succeed"),
3221            "Should acquire lease"
3222        );
3223
3224        // then — second call must still see the sub-quorum block
3225        let blocks = adapter
3226            .unreconciled_blocks(1.into())
3227            .await
3228            .expect("reconciliation should succeed with lock held");
3229        assert_eq!(
3230            blocks.len(),
3231            1,
3232            "Repair failure should not make backlog unreadable on the next round"
3233        );
3234    }
3235}