1use std::sync::Arc;
2use tokio::time::{self, Duration};
3use yrs::{sync::Awareness, DeepObservable, Doc, Map, Observable, Transact};
4use mf_collab_client::AwarenessRef;
5
6use mf_collab_client::{provider::WebsocketProvider, types::SyncEvent};
7
8#[tokio::main]
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 let server_url = "ws://127.0.0.1:8080/collaboration"; let room_name = "demo-room";
17
18 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 },
104 yrs::types::Event::Map(_map_event) => {
105 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 let mut counter = 0; loop {
160 tokio::select! {
161
162 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 let node_id = uuid::Uuid::new_v4().to_string();
187
188 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}