use std::sync::Arc;
use tokio::time::{self, Duration};
use yrs::{sync::Awareness, DeepObservable, Doc, Map, Observable, Transact};
use mf_collab_client::AwarenessRef;
use tracing_subscriber;
use serde_json;
use mf_collab_client::{provider::WebsocketProvider, types::SyncEvent};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
let server_url = "ws://127.0.0.1:8080/collaboration"; let room_name = "demo-room";
let doc = Doc::new();
let client_id = doc.client_id();
let awareness: AwarenessRef =
Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
let unique_user_id =
format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
tracing::info!("🆔 Yrs Client ID: {}", client_id);
tracing::info!("👤 User ID: {}", unique_user_id);
let mut provider = WebsocketProvider::new(
server_url.to_string(),
room_name.to_string(),
awareness.clone(),
)
.await;
if let Some(mut receiver) = provider.subscribe_sync_events() {
tokio::spawn(async move {
while let Ok(event) = receiver.recv().await {
match event {
SyncEvent::InitialSyncCompleted {
has_data,
elapsed_ms,
} => {
if has_data {
println!(
"🎉 同步完成,房间有数据!耗时: {}ms",
elapsed_ms
);
} else {
println!(
"📭 同步完成,空房间!耗时: {}ms",
elapsed_ms
);
}
},
SyncEvent::ProtocolStateChanged(state) => {
println!("📡 协议状态: {:?}", state);
},
SyncEvent::DataReceived => {
println!("📥 收到数据更新");
},
SyncEvent::ConnectionFailed(error) => {
println!("🔌 监听: {:?}", error);
},
SyncEvent::ConnectionChanged(status) => {
println!("🔌 连接状态: {:?}", status);
},
_ => {},
}
}
});
}
provider.connect().await;
{
let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
provider.subscription(nodes_map.observe(move |txn, event| {
for (key, change) in event.keys(txn) {
match change {
yrs::types::EntryChange::Inserted(value) => {
println!("新增 key: {}, value: {:?}", key, value);
},
yrs::types::EntryChange::Removed(old_value) => {
println!(
"删除 key: {}, old value: {:?}",
key, old_value
);
},
yrs::types::EntryChange::Updated(old_value, new_value) => {
println!(
"更新 key: {}, old: {:?}, new: {:?}",
key, old_value, new_value
);
},
}
}
}));
provider.subscription(nodes_map.observe_deep(move |txn, events| {
for event in events.iter() {
match event {
yrs::types::Event::Array(array_event) => {
},
yrs::types::Event::Map(map_event) => {
},
_ => {},
}
}
}));
}
{
let client_id_ref = client_id;
let mut awareness_lock = awareness.write().await;
provider.subscription(awareness_lock.on_update(move |event| {
println!("📡 awareness update: {:?}", event.awareness_state());
let states = event.awareness_state();
for client_id in states.all_clients() {
let meta: &yrs::sync::awareness::MetaClientState =
states.get_meta(client_id).unwrap();
if client_id == client_id_ref {
println!(
"🏠 本地客户端 {}: clock={}, last_updated={:?}",
client_id, meta.clock, meta.last_updated
);
} else {
println!(
"🌐 远程客户端 {}: clock={}, last_updated={:?}",
client_id, meta.clock, meta.last_updated
);
}
}
}));
awareness_lock.set_local_state(
serde_json::json!({
"user": {
"id": unique_user_id,
"name": "用户李兴栋",
"color": "#FFEAA7",
"online": true,
"client_id": client_id,
"timestamp": chrono::Utc::now().timestamp()
},
"online": true
})
.to_string(),
);
tracing::info!("✅ 设置 awareness 状态完成");
}
let mut counter = 0;
loop {
tokio::select! {
_ = time::sleep(Duration::from_secs(3)) => {
if provider.is_connected() {
counter += 1;
let mut awareness_lock = awareness.write().await;
awareness_lock.set_local_state(serde_json::json!({
"user": {
"id": unique_user_id,
"name": "用户李兴栋",
"color": "#FFEAA7",
"online": true,
"client_id": client_id,
"timestamp": chrono::Utc::now().timestamp(),
"heartbeat": counter },
"online": true,
"last_activity": chrono::Utc::now().timestamp() }).to_string());
let doc = awareness_lock.doc_mut();
let nodes_map = doc.get_or_insert_map("nodes");
let mut txn = doc.transact_mut_with(doc.client_id().to_string());
let node_id = uuid::Uuid::new_v4().to_string();
let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
drop(txn);
tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("🔌 正在断开连接...");
provider.disconnect().await;
tracing::info!("✅ 已断开连接。");
break;
}
}
}
Ok(())
}