Skip to main content

client/
client.rs

1use std::sync::Arc;
2use tokio::time::{self, Duration};
3use yrs::{sync::Awareness, DeepObservable, Doc, Map, Observable, Transact};
4use mf_collab_client::AwarenessRef;
5
6use mf_collab_client::{provider::WebsocketProvider, types::SyncEvent};
7
8/// 客户端示例,连接到 `collaboration.rs` 中启动的测试服务器
9#[tokio::main]
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}