use crate::common::error::Result;
use crate::common::protocol::Frame;
use crate::client::{ClientConfig, HybridClient, Client};
use crate::transport::events::ConnectionObserver;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct ObserverClientBuilder {
config: ClientConfig,
observer: Option<Arc<dyn ConnectionObserver>>,
event_handler: Option<Arc<dyn crate::client::events::handler::ClientEventHandler>>,
}
impl ObserverClientBuilder {
pub fn new(server_url: impl Into<String>) -> Self {
Self {
config: ClientConfig::new(server_url.into()),
observer: None,
event_handler: None,
}
}
pub fn with_event_handler(mut self, event_handler: Arc<dyn crate::client::events::handler::ClientEventHandler>) -> Self {
self.event_handler = Some(event_handler);
self
}
pub fn with_observer(mut self, observer: Arc<dyn ConnectionObserver>) -> Self {
self.observer = Some(observer);
self
}
pub fn with_protocol(mut self, protocol: crate::common::config_types::TransportProtocol) -> Self {
self.config.transport = protocol;
self
}
pub fn with_protocol_race(mut self, protocols: Vec<crate::common::config_types::TransportProtocol>) -> Self {
self.config = self.config.with_protocol_race(protocols);
self
}
pub fn with_protocol_url(mut self, protocol: crate::common::config_types::TransportProtocol, url: String) -> Self {
self.config = self.config.with_protocol_url(protocol, url);
self
}
pub fn with_user_id(mut self, user_id: String) -> Self {
self.config = self.config.with_user_id(user_id);
self
}
pub fn with_format(mut self, format: crate::common::protocol::SerializationFormat) -> Self {
self.config = self.config.with_format(format);
self
}
pub fn with_compression(mut self, compression: crate::common::compression::CompressionAlgorithm) -> Self {
self.config = self.config.with_compression(compression);
self
}
pub fn with_heartbeat(mut self, heartbeat: crate::common::config_types::HeartbeatConfig) -> Self {
self.config = self.config.with_heartbeat(heartbeat);
self
}
pub fn with_tls(mut self, tls: crate::common::config_types::TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_connect_timeout(mut self, timeout: std::time::Duration) -> Self {
self.config = self.config.with_connect_timeout(timeout);
self
}
pub fn with_reconnect_interval(mut self, interval: std::time::Duration) -> Self {
self.config = self.config.with_reconnect_interval(interval);
self
}
pub fn with_max_reconnect_attempts(mut self, max: Option<u32>) -> Self {
self.config = self.config.with_max_reconnect_attempts(max);
self
}
pub fn enable_router(mut self) -> Self {
self.config = self.config.enable_router();
self
}
pub fn with_device_info(mut self, device_info: crate::common::device::DeviceInfo) -> Self {
self.config = self.config.with_device_info(device_info);
self
}
pub fn with_token(mut self, token: String) -> Self {
self.config = self.config.with_token(token);
self
}
pub async fn build_with_race(self) -> Result<ObserverClient> {
let observer = self.observer.ok_or_else(|| {
crate::common::error::FlareError::general_error("Observer is required")
})?;
let mut client = HybridClient::connect_with_race(self.config).await?;
let client_arc = Arc::new(Mutex::new(client));
{
let mut client = client_arc.lock().await;
if let Some(event_handler) = self.event_handler {
client.core_mut().set_event_handler(Some(event_handler));
}
}
{
let observer_clone = Arc::clone(&observer);
let mut client = client_arc.lock().await;
client.add_observer(observer_clone);
}
Ok(ObserverClient {
client: client_arc,
observer: Some(observer),
})
}
pub fn build(self) -> Result<ObserverClient> {
let observer = self.observer.ok_or_else(|| {
crate::common::error::FlareError::general_error("Observer is required")
})?;
let mut client = HybridClient::new(self.config)?;
if let Some(event_handler) = self.event_handler {
client.core_mut().set_event_handler(Some(event_handler));
}
let client_arc = Arc::new(Mutex::new(client));
Ok(ObserverClient {
client: client_arc,
observer: Some(observer),
})
}
}
pub struct ObserverClient {
client: Arc<Mutex<HybridClient>>,
observer: Option<Arc<dyn ConnectionObserver>>,
}
impl ObserverClient {
pub async fn connect(&mut self) -> Result<()> {
if let Some(observer) = self.observer.take() {
let mut client = self.client.lock().await;
client.add_observer(observer);
}
let mut client = self.client.lock().await;
client.connect().await
}
pub async fn disconnect(&mut self) -> Result<()> {
let mut client = self.client.lock().await;
client.disconnect().await
}
pub async fn send_frame(&mut self, frame: &Frame) -> Result<()> {
let mut client = self.client.lock().await;
client.send_frame(frame).await
}
pub fn is_connected(&self) -> bool {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.is_connected()
})
}
pub fn connection_id(&self) -> Option<String> {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.connection_id()
})
}
pub fn active_protocol(&self) -> crate::common::config_types::TransportProtocol {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.active_protocol()
})
}
pub fn core(&self) -> Option<std::sync::Arc<crate::client::transports::ClientCore>> {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
None
})
}
}