use yrs::sync::{Message, SyncMessage};
use yrs::updates::encoder::Encode;
use yrs::{Subscription};
use url::Url;
use yrs_warp::AwarenessRef;
use crate::types::*;
use crate::conn::ClientConn;
use futures_util::SinkExt;
pub struct WebsocketProvider {
pub server_url: String,
pub room_name: String,
pub awareness: AwarenessRef,
client_conn: Option<ClientConn>,
pub status: ConnectionStatus,
pub ws_reconnect_attempts: u32,
pub max_backoff_time: u64,
pub ws_url: Option<Url>,
pub client_id: u64,
subscriptions:Vec<Subscription>
}
impl WebsocketProvider {
pub async fn new(
server_url: String,
room_name: String,
awareness: AwarenessRef,
) -> Self {
tracing::info!("Creating new WebsocketProvider");
Self::new_with_options(
server_url,
room_name,
awareness,
WebsocketProviderOptions::default(),
)
.await
}
pub async fn new_with_options(
server_url: String,
room_name: String,
awareness: AwarenessRef,
options: WebsocketProviderOptions,
) -> Self {
let ws_url = Url::parse(&format!(
"{}/{}",
server_url.trim_end_matches('/'),
room_name
))
.ok();
let client_id = awareness.read().await.doc().client_id();
Self {
client_id,
server_url,
room_name,
awareness,
client_conn: None,
status: ConnectionStatus::Disconnected,
ws_reconnect_attempts: 0,
max_backoff_time: options.max_backoff_time,
ws_url,
subscriptions:Vec::new()
}
}
pub fn subscription(&mut self,subscription: Subscription){
self.subscriptions.push(subscription);
}
pub async fn connect(&mut self) {
if self.status == ConnectionStatus::Connected
|| self.status == ConnectionStatus::Connecting
{
return;
}
self.setup_connection().await;
}
async fn setup_connection(&mut self) {
self.status = ConnectionStatus::Connecting;
let ws_url = match &self.ws_url {
Some(url) => url.as_str(),
None => {
self.status = ConnectionStatus::Disconnected;
return;
},
};
let client_conn =
match ClientConn::connect(ws_url, self.awareness.clone()).await {
Ok(conn) => conn,
Err(e) => {
self.status = ConnectionStatus::Disconnected;
self.ws_reconnect_attempts += 1;
tracing::error!("ClientConn connect error: {}", e);
return;
},
};
self.client_conn = Some(client_conn);
self.status = ConnectionStatus::Connected;
self.ws_reconnect_attempts = 0;
self.setup_update_listeners().await;
}
async fn setup_update_listeners(&mut self) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let doc_subscription = {
let sink = self.client_conn.as_ref().unwrap().0.sink();
let client_id = self.client_id.clone();
let awareness_lock = self.awareness.read().await;
let doc = awareness_lock.doc();
doc.observe_update_v1(move |txn, event| {
let origin = txn.origin();
if let Some(origin_ref) = origin {
let origin_bytes = origin_ref.as_ref();
if let Ok(origin_str) = std::str::from_utf8(origin_bytes) {
let update = event.update.to_owned();
if origin_str == client_id.to_string() {
let sink_weak = sink.clone();
tokio::spawn(async move {
let msg =
Message::Sync(SyncMessage::Update(update))
.encode_v1();
let binding = sink_weak.upgrade().unwrap();
let mut sink = binding.lock().await;
sink.send(msg).await.unwrap();
});
}
}
}
})
};
if let Ok(subscription) = doc_subscription {
self.subscriptions.push(subscription);
}
{
let awareness_lock = self.awareness.write().await;
let sink = self.client_conn.as_ref().unwrap().0.sink();
let awareness_subscription =
awareness_lock.on_update(move |event| {
let awareness_update = event.awareness_update().unwrap();
let sink_weak = sink.clone();
tokio::spawn(async move {
let msg: Vec<u8> =
Message::Awareness(awareness_update).encode_v1();
let binding = sink_weak.upgrade().unwrap();
let mut sink = binding.lock().await;
sink.send(msg).await.unwrap();
});
});
self.subscriptions.push(awareness_subscription);
tracing::info!("✅ 本地 Awareness 变更监听器已设置");
}
}
pub async fn disconnect(&mut self) {
tracing::info!("🔌 断开 WebSocket 连接...");
self.client_conn = None;
self.status = ConnectionStatus::Disconnected;
tracing::info!("✅ WebSocket 连接已断开");
}
pub fn is_connected(&self) -> bool {
self.status == ConnectionStatus::Connected && self.client_conn.is_some()
}
pub fn get_status(&self) -> &ConnectionStatus {
&self.status
}
}
impl Drop for WebsocketProvider {
fn drop(&mut self) {
tracing::debug!("🧹 WebsocketProvider 已清理");
}
}