use std::sync::Arc;
use tokio::time::{self, Duration};
use yrs::{sync::Awareness, Doc, Map, Observable, Transact};
use yrs_warp::AwarenessRef;
use tracing_subscriber;
use serde_json;
use mf_collab_client::{provider::WebsocketProvider};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
let server_url = "ws://127.0.0.1:8080/collaboration"; let room_name = "demo-room";
let doc = Doc::new();
let client_id = doc.client_id();
let awareness: AwarenessRef =
Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
let unique_user_id = format!("client-user-{}-{}",
chrono::Utc::now().timestamp(),
client_id
);
tracing::info!("🆔 Yrs Client ID: {}", client_id);
tracing::info!("👤 User ID: {}", unique_user_id);
let mut provider = WebsocketProvider::new(
server_url.to_string(),
room_name.to_string(),
awareness.clone(),
)
.await;
provider.connect().await;
{
let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
provider.subscription(nodes_map.observe(move|txn, event|{
for (key, change) in event.keys(txn) {
match change {
yrs::types::EntryChange::Inserted(value) => {
println!("新增 key: {}, value: {:?}", key, value);
}
yrs::types::EntryChange::Removed(old_value) => {
println!("删除 key: {}, old value: {:?}", key, old_value);
}
yrs::types::EntryChange::Updated(old_value, new_value) => {
println!("更新 key: {}, old: {:?}, new: {:?}", key, old_value, new_value);
}
}
}
}));
}
{
let client_id_ref = client_id;
let mut awareness_lock = awareness.write().await;
provider.subscription(awareness_lock.on_update(move |event|{
println!("📡 awareness update: {:?}", event.awareness_state());
let states = event.awareness_state();
for client_id in states.all_clients() {
let meta: &yrs::sync::awareness::MetaClientState = states.get_meta(client_id).unwrap();
if client_id == client_id_ref { println!("🏠 本地客户端 {}: clock={}, last_updated={:?}",
client_id, meta.clock, meta.last_updated);
} else { println!("🌐 远程客户端 {}: clock={}, last_updated={:?}",
client_id, meta.clock, meta.last_updated);
}
}
}));
awareness_lock.set_local_state(serde_json::json!({
"user": {
"id": unique_user_id,
"name": "用户李兴栋",
"color": "#FFEAA7",
"online": true,
"client_id": client_id,
"timestamp": chrono::Utc::now().timestamp()
},
"online": true
}).to_string());
tracing::info!("✅ 设置 awareness 状态完成");
}
let mut counter = 0;
loop {
tokio::select! {
_ = time::sleep(Duration::from_secs(3)) => {
if provider.is_connected() {
counter += 1;
let mut awareness_lock = awareness.write().await;
awareness_lock.set_local_state(serde_json::json!({
"user": {
"id": unique_user_id,
"name": "用户李兴栋",
"color": "#FFEAA7",
"online": true,
"client_id": client_id,
"timestamp": chrono::Utc::now().timestamp(),
"heartbeat": counter },
"online": true,
"last_activity": chrono::Utc::now().timestamp() }).to_string());
let doc = awareness_lock.doc_mut();
let nodes_map = doc.get_or_insert_map("nodes");
let mut txn = doc.transact_mut_with(doc.client_id().to_string());
let node_id = uuid::Uuid::new_v4().to_string();
let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
drop(txn);
tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("🔌 正在断开连接...");
provider.disconnect().await;
tracing::info!("✅ 已断开连接。");
break;
}
}
}
Ok(())
}