mod private
{
use std::
{
sync ::{ Arc, Mutex },
time ::Instant,
};
use core::time::Duration;
use serde::{ Deserialize, Serialize };
use tokio::sync::mpsc;
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum EndpointHealth
{
Healthy,
Degraded,
Unhealthy,
Unknown,
}
#[ derive( Debug, Clone, PartialEq, Serialize, Deserialize ) ]
pub enum FailoverStrategy
{
RoundRobin,
Priority,
Random,
Sticky,
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct FailoverEndpoint
{
pub id : String,
pub url : String,
pub priority : i32,
pub timeout : Duration,
pub health : EndpointHealth,
#[ serde( skip, default = "Instant::now" ) ]
pub last_checked : Instant,
}
impl FailoverEndpoint
{
#[ inline ]
#[ must_use ]
pub fn new( id : String, url : String, priority : i32, timeout : Duration ) -> Self
{
Self
{
id,
url,
priority,
timeout,
health : EndpointHealth::Unknown,
last_checked : Instant::now(),
}
}
#[ inline ]
#[ must_use ]
pub fn is_available( &self ) -> bool
{
matches!( self.health, EndpointHealth::Healthy | EndpointHealth::Degraded )
}
#[ inline ]
#[ must_use ]
pub fn is_preferred( &self ) -> bool
{
matches!( self.health, EndpointHealth::Healthy )
}
#[ inline ]
pub fn update_health( &mut self, health : EndpointHealth )
{
self.health = health;
self.last_checked = Instant::now();
}
#[ inline ]
#[ must_use ]
pub fn time_since_check( &self ) -> Duration
{
self.last_checked.elapsed()
}
}
#[ derive( Debug, Clone, Serialize, Deserialize ) ]
pub struct FailoverConfig
{
pub strategy : FailoverStrategy,
pub max_retries : u32,
pub retry_delay_ms : u64,
pub max_retry_delay_ms : u64,
pub health_check_interval_ms : u64,
pub failover_timeout_ms : u64,
}
impl Default for FailoverConfig
{
#[ inline ]
fn default() -> Self
{
Self
{
strategy : FailoverStrategy::Priority,
max_retries : 3,
retry_delay_ms : 1000,
max_retry_delay_ms : 30000,
health_check_interval_ms : 30000,
failover_timeout_ms : 10000,
}
}
}
#[ derive( Debug, Clone ) ]
pub struct FailoverContext
{
pub attempt : u32,
pub endpoint : FailoverEndpoint,
pub started_at : Instant,
pub failed_endpoints : Vec< String >,
}
impl FailoverContext
{
#[ inline ]
#[ must_use ]
pub fn new( endpoint : FailoverEndpoint ) -> Self
{
Self
{
attempt : 1,
endpoint,
started_at : Instant::now(),
failed_endpoints : Vec::new(),
}
}
#[ inline ]
#[ must_use ]
pub fn next_attempt( mut self, endpoint : FailoverEndpoint ) -> Self
{
self.failed_endpoints.push( self.endpoint.id.clone() );
self.attempt += 1;
self.endpoint = endpoint;
self.started_at = Instant::now();
self
}
#[ inline ]
#[ must_use ]
pub fn is_exhausted( &self, max_retries : u32 ) -> bool
{
self.attempt > max_retries
}
#[ inline ]
#[ must_use ]
pub fn elapsed( &self ) -> Duration
{
self.started_at.elapsed()
}
#[ inline ]
#[ must_use ]
pub fn was_endpoint_tried( &self, endpoint_id : &str ) -> bool
{
self.failed_endpoints.contains( &endpoint_id.to_string() ) || self.endpoint.id == endpoint_id
}
}
#[ derive( Debug ) ]
pub struct FailoverManager
{
config : FailoverConfig,
endpoints : Vec< FailoverEndpoint >,
round_robin_index : Arc< Mutex< usize > >,
}
impl FailoverManager
{
#[ inline ]
#[ must_use ]
pub fn new( config : FailoverConfig, endpoints : Vec< FailoverEndpoint > ) -> Self
{
Self
{
config,
endpoints,
round_robin_index : Arc::new( Mutex::new( 0 ) ),
}
}
#[ inline ]
#[ must_use ]
pub fn config( &self ) -> &FailoverConfig
{
&self.config
}
#[ inline ]
#[ must_use ]
pub fn endpoints( &self ) -> &Vec< FailoverEndpoint >
{
&self.endpoints
}
#[ inline ]
pub fn update_endpoint_health( &mut self, endpoint_id : &str, health : EndpointHealth )
{
if let Some( endpoint ) = self.endpoints.iter_mut().find( | e | e.id == endpoint_id )
{
endpoint.update_health( health );
}
}
#[ inline ]
#[ must_use ]
pub fn select_endpoint( &self, context : Option< &FailoverContext > ) -> Option< FailoverEndpoint >
{
let available_endpoints : Vec< &FailoverEndpoint > = self.endpoints
.iter()
.filter( | e | e.is_available() )
.filter( | e | context.map_or( true, | ctx | !ctx.was_endpoint_tried( &e.id ) ) )
.collect();
if available_endpoints.is_empty()
{
return None;
}
match self.config.strategy
{
FailoverStrategy::RoundRobin => self.select_round_robin( &available_endpoints ),
FailoverStrategy::Priority => Self::select_priority( &available_endpoints ),
FailoverStrategy::Random => Self::select_random( &available_endpoints ),
FailoverStrategy::Sticky => Self::select_sticky( &available_endpoints ),
}
}
fn select_round_robin( &self, endpoints : &[ &FailoverEndpoint ] ) -> Option< FailoverEndpoint >
{
if endpoints.is_empty()
{
return None;
}
let mut index = self.round_robin_index.lock().unwrap();
let selected = endpoints[ *index % endpoints.len() ].clone();
*index = ( *index + 1 ) % endpoints.len();
Some( selected )
}
fn select_priority( endpoints : &[ &FailoverEndpoint ] ) -> Option< FailoverEndpoint >
{
endpoints
.iter()
.max_by_key( | e | e.priority )
.map( | e | ( *e ).clone() )
}
fn select_random( endpoints : &[ &FailoverEndpoint ] ) -> Option< FailoverEndpoint >
{
if endpoints.is_empty()
{
return None;
}
let index = ( Instant::now().elapsed().as_nanos() as usize ) % endpoints.len();
Some( endpoints[ index ].clone() )
}
fn select_sticky( endpoints : &[ &FailoverEndpoint ] ) -> Option< FailoverEndpoint >
{
endpoints
.iter()
.find( | e | e.is_preferred() )
.or_else( || endpoints.first() )
.map( | e | ( *e ).clone() )
}
#[ inline ]
#[ must_use ]
pub fn healthy_endpoint_count( &self ) -> usize
{
self.endpoints.iter().filter( | e | e.is_preferred() ).count()
}
#[ inline ]
#[ must_use ]
pub fn available_endpoint_count( &self ) -> usize
{
self.endpoints.iter().filter( | e | e.is_available() ).count()
}
#[ inline ]
#[ must_use ]
pub fn calculate_retry_delay( &self, attempt : u32 ) -> Duration
{
let base_delay = Duration::from_millis( self.config.retry_delay_ms );
let max_delay = Duration::from_millis( self.config.max_retry_delay_ms );
let multiplier = 2_u64.saturating_pow( attempt.saturating_sub( 1 ) );
let calculated_delay = base_delay.saturating_mul( u32::try_from( multiplier ).unwrap_or( u32::MAX ) );
core ::cmp::min( calculated_delay, max_delay )
}
}
#[ derive( Debug ) ]
pub struct FailoverExecutor;
impl FailoverExecutor
{
#[ inline ]
pub async fn execute_with_failover< T, E, F, Fut >(
manager : &FailoverManager,
operation : F,
) -> Result< T, FailoverError< E > >
where
F : Fn( FailoverContext ) -> Fut + Send + Sync,
Fut : core::future::Future< Output = Result< T, E > > + Send,
E : Send + Sync + 'static,
{
let mut context = None;
for attempt in 1..=manager.config.max_retries
{
let Some( endpoint ) = manager.select_endpoint( context.as_ref() ) else {
return Err( FailoverError::NoAvailableEndpoints );
};
context = Some( match context
{
Some( ctx ) => ctx.next_attempt( endpoint ),
None => FailoverContext::new( endpoint ),
});
let ctx = context.as_ref().unwrap();
match operation( ctx.clone() ).await
{
Ok( result ) => return Ok( result ),
Err( error ) =>
{
if attempt == manager.config.max_retries
{
return Err( FailoverError::AllEndpointsFailed( Box::new( error ) ) );
}
let delay = manager.calculate_retry_delay( attempt );
tokio ::time::sleep( delay ).await;
}
}
}
Err( FailoverError::MaxRetriesExceeded )
}
#[ inline ]
#[ must_use ]
pub fn create_failover_notifier() -> ( FailoverEventSender, FailoverEventReceiver )
{
let ( tx, rx ) = mpsc::unbounded_channel();
( FailoverEventSender { sender : tx }, FailoverEventReceiver { receiver : rx } )
}
#[ inline ]
pub fn validate_config( config : &FailoverConfig ) -> Result< (), String >
{
if config.max_retries == 0
{
return Err( "max_retries must be greater than 0".to_string() );
}
if config.retry_delay_ms == 0
{
return Err( "retry_delay_ms must be greater than 0".to_string() );
}
if config.max_retry_delay_ms < config.retry_delay_ms
{
return Err( "max_retry_delay_ms must be greater than or equal to retry_delay_ms".to_string() );
}
Ok( () )
}
#[ inline ]
#[ must_use ]
pub fn create_basic_manager( endpoints : Vec< ( String, String, i32 ) > ) -> FailoverManager
{
let failover_endpoints : Vec< FailoverEndpoint > = endpoints
.into_iter()
.map( | ( id, url, priority ) |
{
FailoverEndpoint::new( id, url, priority, Duration::from_secs( 30 ) )
})
.collect();
FailoverManager::new( FailoverConfig::default(), failover_endpoints )
}
}
#[ derive( Debug ) ]
pub enum FailoverError< E >
{
NoAvailableEndpoints,
AllEndpointsFailed( Box< E > ),
MaxRetriesExceeded,
ConfigurationError( String ),
}
impl< E > core::fmt::Display for FailoverError< E >
where
E : core::fmt::Display,
{
#[ inline ]
fn fmt( &self, f : &mut core::fmt::Formatter< '_ > ) -> core::fmt::Result
{
match self
{
FailoverError::NoAvailableEndpoints => write!( f, "No endpoints are available for failover" ),
FailoverError::AllEndpointsFailed( error ) => write!( f, "All endpoints failed : {error}" ),
FailoverError::MaxRetriesExceeded => write!( f, "Maximum retry attempts exceeded" ),
FailoverError::ConfigurationError( msg ) => write!( f, "Configuration error : {msg}" ),
}
}
}
impl< E > std::error::Error for FailoverError< E >
where
E : std::error::Error + 'static,
{
#[ inline ]
fn source( &self ) -> Option< &( dyn std::error::Error + 'static ) >
{
match self
{
FailoverError::AllEndpointsFailed( error ) => Some( error.as_ref() ),
_ => None,
}
}
}
#[ derive( Debug, Clone ) ]
pub enum FailoverEvent
{
HealthChanged
{
endpoint_id : String,
old_health : EndpointHealth,
new_health : EndpointHealth,
},
FailoverStarted
{
endpoint_id : String,
attempt : u32,
},
FailoverSucceeded
{
endpoint_id : String,
attempt : u32,
},
FailoverFailed
{
endpoint_id : String,
attempt : u32,
error : String,
},
AllEndpointsExhausted
{
total_attempts : u32,
},
}
#[ derive( Debug, Clone ) ]
pub struct FailoverEventSender
{
sender : mpsc::UnboundedSender< FailoverEvent >,
}
impl FailoverEventSender
{
#[ inline ]
pub fn send_event( &self, event : FailoverEvent ) -> Result< (), &'static str >
{
self.sender.send( event ).map_err( | _ | "Failed to send failover event" )
}
#[ inline ]
pub fn send_health_change( &self, endpoint_id : String, old_health : EndpointHealth, new_health : EndpointHealth ) -> Result< (), &'static str >
{
self.send_event( FailoverEvent::HealthChanged { endpoint_id, old_health, new_health } )
}
#[ inline ]
pub fn send_failover_started( &self, endpoint_id : String, attempt : u32 ) -> Result< (), &'static str >
{
self.send_event( FailoverEvent::FailoverStarted { endpoint_id, attempt } )
}
}
#[ derive( Debug ) ]
pub struct FailoverEventReceiver
{
receiver : mpsc::UnboundedReceiver< FailoverEvent >,
}
impl FailoverEventReceiver
{
#[ inline ]
pub fn try_recv( &mut self ) -> Option< FailoverEvent >
{
self.receiver.try_recv().ok()
}
#[ inline ]
pub async fn recv( &mut self ) -> Option< FailoverEvent >
{
self.receiver.recv().await
}
}
}
crate ::mod_interface!
{
exposed use private::EndpointHealth;
exposed use private::FailoverStrategy;
exposed use private::FailoverEndpoint;
exposed use private::FailoverConfig;
exposed use private::FailoverContext;
exposed use private::FailoverManager;
exposed use private::FailoverExecutor;
exposed use private::FailoverError;
exposed use private::FailoverEvent;
exposed use private::FailoverEventSender;
exposed use private::FailoverEventReceiver;
}