use crate::common::error::Result;
use crate::common::protocol::Frame;
use crate::client::{ClientConfig, HybridClient, Client};
use crate::transport::events::{ConnectionEvent, ConnectionObserver};
use std::sync::Arc;
use tokio::sync::Mutex;
pub type ClientMessageHandler = Box<dyn Fn(&Frame) -> Result<()> + Send + Sync>;
pub type ClientEventHandler = Box<dyn Fn(&ConnectionEvent) + Send + Sync>;
struct SimpleClientObserver {
message_handler: Option<ClientMessageHandler>,
event_handler: Option<ClientEventHandler>,
}
impl ConnectionObserver for SimpleClientObserver {
fn on_event(&self, event: &ConnectionEvent) {
match event {
ConnectionEvent::Message(data) => {
if let Ok(frame) = crate::common::MessageParser::new(
crate::common::protocol::SerializationFormat::Protobuf,
crate::common::compression::CompressionAlgorithm::None,
).parse(data) {
if let Some(ref handler) = self.message_handler {
if let Err(e) = handler(&frame) {
tracing::error!("消息处理错误: {:?}", e);
}
}
}
}
_ => {
if let Some(ref handler) = self.event_handler {
handler(event);
}
}
}
}
}
pub struct SimpleClient {
client: Arc<Mutex<HybridClient>>,
observer: Arc<SimpleClientObserver>,
}
impl SimpleClient {
pub async fn connect(&mut self) -> Result<()> {
{
let mut client = self.client.lock().await;
client.add_observer(self.observer.clone() as Arc<dyn ConnectionObserver>);
}
let mut client = self.client.lock().await;
client.connect().await
}
pub async fn disconnect(&mut self) -> Result<()> {
let mut client = self.client.lock().await;
client.disconnect().await
}
pub async fn send_frame(&mut self, frame: &Frame) -> Result<()> {
let mut client = self.client.lock().await;
client.send_frame(frame).await
}
pub fn is_connected(&self) -> bool {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.is_connected()
})
}
pub fn connection_id(&self) -> Option<String> {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.connection_id()
})
}
pub fn active_protocol(&self) -> crate::common::config_types::TransportProtocol {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
client.active_protocol()
})
}
pub fn core(&self) -> Option<std::sync::Arc<crate::client::transports::ClientCore>> {
tokio::task::block_in_place(|| {
let client = self.client.blocking_lock();
None
})
}
}
pub struct ClientBuilder {
config: ClientConfig,
message_handler: Option<ClientMessageHandler>,
event_handler: Option<ClientEventHandler>,
}
impl ClientBuilder {
pub fn new(server_url: impl Into<String>) -> Self {
Self {
config: ClientConfig::new(server_url.into()),
message_handler: None,
event_handler: None,
}
}
pub fn on_message<F>(mut self, handler: F) -> Self
where
F: Fn(&Frame) -> Result<()> + Send + Sync + 'static,
{
self.message_handler = Some(Box::new(handler));
self
}
pub fn on_event<F>(mut self, handler: F) -> Self
where
F: Fn(&ConnectionEvent) + Send + Sync + 'static,
{
self.event_handler = Some(Box::new(handler));
self
}
pub fn with_protocol(mut self, protocol: crate::common::config_types::TransportProtocol) -> Self {
self.config.transport = protocol;
self
}
pub fn with_protocol_race(mut self, protocols: Vec<crate::common::config_types::TransportProtocol>) -> Self {
self.config = self.config.with_protocol_race(protocols);
self
}
pub fn with_protocol_url(mut self, protocol: crate::common::config_types::TransportProtocol, url: String) -> Self {
self.config = self.config.with_protocol_url(protocol, url);
self
}
pub fn with_user_id(mut self, user_id: String) -> Self {
self.config = self.config.with_user_id(user_id);
self
}
pub fn with_token(mut self, token: String) -> Self {
self.config = self.config.with_token(token);
self
}
pub fn with_format(mut self, format: crate::common::protocol::SerializationFormat) -> Self {
self.config = self.config.with_format(format);
self
}
pub fn with_compression(mut self, compression: crate::common::compression::CompressionAlgorithm) -> Self {
self.config = self.config.with_compression(compression);
self
}
pub fn with_heartbeat(mut self, heartbeat: crate::common::config_types::HeartbeatConfig) -> Self {
self.config = self.config.with_heartbeat(heartbeat);
self
}
pub fn with_tls(mut self, tls: crate::common::config_types::TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_connect_timeout(mut self, timeout: std::time::Duration) -> Self {
self.config = self.config.with_connect_timeout(timeout);
self
}
pub fn with_reconnect_interval(mut self, interval: std::time::Duration) -> Self {
self.config = self.config.with_reconnect_interval(interval);
self
}
pub fn with_max_reconnect_attempts(mut self, max: Option<u32>) -> Self {
self.config = self.config.with_max_reconnect_attempts(max);
self
}
pub fn enable_router(mut self) -> Self {
self.config = self.config.enable_router();
self
}
pub fn build(self) -> Result<SimpleClient> {
let observer = Arc::new(SimpleClientObserver {
message_handler: self.message_handler,
event_handler: self.event_handler,
});
let client = HybridClient::new(self.config)?;
let client_arc = Arc::new(Mutex::new(client));
Ok(SimpleClient {
client: client_arc,
observer,
})
}
}