use crate::client::transports::Client;
use crate::client::config::ClientConfig;
use crate::client::connection::ConnectionStateManager;
use crate::common::error::Result;
use crate::client::heartbeat::HeartbeatManager;
use crate::common::MessageParser;
use crate::common::protocol::Frame;
use crate::transport::events::{ArcObserver, ConnectionEvent};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info, warn};
pub struct ClientConnectionManager {
config: ClientConfig,
client: Arc<Mutex<Box<dyn Client>>>,
state_manager: Arc<ConnectionStateManager>,
heartbeat_manager: Arc<Mutex<Option<HeartbeatManager>>>,
parser: MessageParser,
observers: Arc<StdMutex<Vec<ArcObserver>>>,
reconnect_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
is_reconnecting: Arc<Mutex<bool>>,
}
impl ClientConnectionManager {
pub fn new(client: Box<dyn Client>, config: ClientConfig) -> Self {
let parser = MessageParser::new(config.serialization_format, config.compression);
Self {
config,
client: Arc::new(Mutex::new(client)),
state_manager: Arc::new(ConnectionStateManager::new()),
heartbeat_manager: Arc::new(Mutex::new(None)),
parser,
observers: Arc::new(StdMutex::new(Vec::new())),
reconnect_handle: Arc::new(Mutex::new(None)),
is_reconnecting: Arc::new(Mutex::new(false)),
}
}
pub async fn connect(&self) -> Result<()> {
info!("Connecting to server: {}", self.config.server_url);
let mut client = self.client.lock().await;
if client.is_connected() {
debug!("Already connected, skipping connect");
return Ok(());
}
self.state_manager.set_state(crate::client::connection::ConnectionState::Connecting);
match client.connect().await {
Ok(()) => {
info!("Successfully connected to server");
self.state_manager.set_state(crate::client::connection::ConnectionState::Connected);
if self.config.heartbeat.enabled {
self.start_heartbeat().await?;
}
self.notify_observers(&ConnectionEvent::Connected);
Ok(())
}
Err(e) => {
error!("Failed to connect: {}", e);
self.state_manager.set_state(crate::client::connection::ConnectionState::Disconnected);
Err(e)
}
}
}
pub async fn disconnect(&self) -> Result<()> {
info!("Disconnecting from server");
self.stop_reconnect().await;
self.stop_heartbeat().await;
let mut client = self.client.lock().await;
let result = client.disconnect().await;
self.state_manager.set_state(crate::client::connection::ConnectionState::Disconnected);
self.notify_observers(&ConnectionEvent::Disconnected(String::new()));
result
}
pub async fn send_frame(&self, frame: &Frame) -> Result<()> {
if !self.state_manager.get_state().can_send() {
return Err(crate::common::error::FlareError::connection_failed(
"Not connected".to_string()
));
}
let mut client = self.client.lock().await;
client.send_frame(frame).await
}
async fn start_heartbeat(&self) -> Result<()> {
if !self.config.heartbeat.enabled {
return Ok(());
}
debug!("Starting heartbeat: interval={:?}, timeout={:?}",
self.config.heartbeat.interval,
self.config.heartbeat.timeout);
let mut heartbeat = HeartbeatManager::new(
self.config.heartbeat.interval,
self.config.heartbeat.timeout,
);
let mut hb_mgr = self.heartbeat_manager.lock().await;
*hb_mgr = Some(heartbeat);
Ok(())
}
async fn stop_heartbeat(&self) {
let mut hb_mgr = self.heartbeat_manager.lock().await;
if let Some(mut hb) = hb_mgr.take() {
hb.stop();
}
}
pub async fn start_auto_reconnect(&self) {
let mut is_reconnecting = self.is_reconnecting.lock().await;
if *is_reconnecting {
return; }
*is_reconnecting = true;
drop(is_reconnecting);
let client = Arc::clone(&self.client);
let state_mgr = Arc::clone(&self.state_manager);
let config = self.config.clone();
let heartbeat_cfg = self.config.heartbeat.clone();
let heartbeat_mgr = Arc::clone(&self.heartbeat_manager);
let observers = Arc::clone(&self.observers);
let reconnect_handle = Arc::clone(&self.reconnect_handle);
let is_reconnecting_flag = Arc::clone(&self.is_reconnecting);
let handle = tokio::spawn(async move {
loop {
let should_reconnect = {
let client_guard = client.lock().await;
!client_guard.is_connected() && matches!(state_mgr.get_state(), crate::client::connection::ConnectionState::Disconnected)
};
if !should_reconnect {
sleep(Duration::from_secs(1)).await;
continue;
}
info!("Attempting to reconnect...");
state_mgr.set_state(crate::client::connection::ConnectionState::Connecting);
let reconnect_result = {
let mut client_guard = client.lock().await;
client_guard.connect().await
};
match reconnect_result {
Ok(()) => {
info!("Reconnected successfully");
state_mgr.set_state(crate::client::connection::ConnectionState::Connected);
if heartbeat_cfg.enabled {
let mut hb_mgr = heartbeat_mgr.lock().await;
}
{
let observers_guard = observers.lock().unwrap();
for observer in observers_guard.iter() {
observer.on_event(&ConnectionEvent::Connected);
}
}
*is_reconnecting_flag.lock().await = false;
break;
}
Err(e) => {
warn!("Reconnect failed: {}, retrying in {:?}", e, config.reconnect_interval);
state_mgr.set_state(crate::client::connection::ConnectionState::Disconnected);
sleep(config.reconnect_interval).await;
}
}
}
let mut handle_guard = reconnect_handle.lock().await;
*handle_guard = None;
});
let mut handle_guard = self.reconnect_handle.lock().await;
*handle_guard = Some(handle);
}
async fn stop_reconnect(&self) {
let mut handle_guard = self.reconnect_handle.lock().await;
if let Some(handle) = handle_guard.take() {
handle.abort();
}
*self.is_reconnecting.lock().await = false;
}
pub fn add_observer(&self, observer: ArcObserver) {
let mut observers = self.observers.lock().unwrap();
observers.push(observer);
}
pub fn remove_observer(&self, observer: ArcObserver) {
let mut observers = self.observers.lock().unwrap();
observers.retain(|o| !Arc::ptr_eq(o, &observer));
}
fn notify_observers(&self, event: &ConnectionEvent) {
let observers = self.observers.lock().unwrap();
for observer in observers.iter() {
observer.on_event(event);
}
}
pub async fn is_connected(&self) -> bool {
let client = self.client.lock().await;
client.is_connected()
}
pub async fn connection_id(&self) -> Option<String> {
let client = self.client.lock().await;
client.connection_id()
}
pub fn state(&self) -> crate::client::connection::ConnectionState {
self.state_manager.get_state()
}
}