pub struct WebsocketProvider {
pub server_url: String,
pub room_name: String,
pub awareness: AwarenessRef,
pub status: ConnectionStatus,
pub ws_reconnect_attempts: u32,
pub max_backoff_time: u64,
pub ws_url: Option<Url>,
pub client_id: u64,
/* private fields */
}Fields§
§server_url: String§room_name: String§awareness: AwarenessRef§status: ConnectionStatus§ws_reconnect_attempts: u32§max_backoff_time: u64§ws_url: Option<Url>§client_id: u64Implementations§
Source§impl WebsocketProvider
impl WebsocketProvider
Sourcepub async fn new(
server_url: String,
room_name: String,
awareness: AwarenessRef,
) -> Self
pub async fn new( server_url: String, room_name: String, awareness: AwarenessRef, ) -> Self
Examples found in repository?
examples/client.rs (lines 31-35)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}Sourcepub fn subscription(&mut self, subscription: Subscription)
pub fn subscription(&mut self, subscription: Subscription)
Examples found in repository?
examples/client.rs (lines 79-97)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}Sourcepub async fn connect(&mut self)
pub async fn connect(&mut self)
Examples found in repository?
examples/client.rs (line 75)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}pub async fn connect_with_retry( &mut self, config: Option<ConnectionRetryConfig>, ) -> Result<()>
Sourcepub async fn check_server_availability(&self) -> bool
pub async fn check_server_availability(&self) -> bool
检查服务端是否可用
pub async fn smart_connect(&mut self) -> Result<()>
Sourcepub async fn wait_for_protocol_sync(&self, timeout_ms: u64) -> Result<bool>
pub async fn wait_for_protocol_sync(&self, timeout_ms: u64) -> Result<bool>
等待协议级同步完成(包括空房间)
Sourcepub async fn get_protocol_sync_state(&self) -> Option<ProtocolSyncState>
pub async fn get_protocol_sync_state(&self) -> Option<ProtocolSyncState>
获取协议同步状态
Sourcepub fn subscribe_sync_events(&mut self) -> Option<SyncEventReceiver>
pub fn subscribe_sync_events(&mut self) -> Option<SyncEventReceiver>
订阅同步事件
Examples found in repository?
examples/client.rs (line 39)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}Sourcepub async fn disconnect(&mut self)
pub async fn disconnect(&mut self)
断开连接并清理资源
Examples found in repository?
examples/client.rs (line 200)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}Sourcepub fn is_connected(&self) -> bool
pub fn is_connected(&self) -> bool
检查连接状态
Examples found in repository?
examples/client.rs (line 164)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11 // 初始化日志
12 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14 // 从 `collaboration.rs` 测试中获取服务器详细信息
15 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16 let room_name = "demo-room";
17
18 // 1. 初始化客户端的文档和 awareness 状态
19 let doc = Doc::new();
20 let client_id = doc.client_id();
21 let awareness: AwarenessRef =
22 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24 // 🔍 生成唯一的用户 ID
25 let unique_user_id =
26 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28 tracing::info!("🆔 Yrs Client ID: {}", client_id);
29 tracing::info!("👤 User ID: {}", unique_user_id);
30
31 let mut provider = WebsocketProvider::new(
32 server_url.to_string(),
33 room_name.to_string(),
34 awareness.clone(),
35 )
36 .await;
37
38 // 订阅同步事件
39 if let Some(mut receiver) = provider.subscribe_sync_events() {
40 tokio::spawn(async move {
41 while let Ok(event) = receiver.recv().await {
42 match event {
43 SyncEvent::InitialSyncCompleted {
44 has_data,
45 elapsed_ms,
46 } => {
47 if has_data {
48 println!(
49 "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50 );
51 } else {
52 println!(
53 "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54 );
55 }
56 },
57 SyncEvent::ProtocolStateChanged(state) => {
58 println!("📡 协议状态: {state:?}");
59 },
60 SyncEvent::DataReceived => {
61 println!("📥 收到数据更新");
62 },
63 SyncEvent::ConnectionFailed(error) => {
64 println!("🔌 监听: {error:?}");
65 },
66 SyncEvent::ConnectionChanged(status) => {
67 println!("🔌 连接状态: {status:?}");
68 },
69 }
70 }
71 });
72 }
73
74 // 3. 连接到服务器
75 provider.connect().await;
76 {
77 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78 // 订阅 nodes变更 浅
79 provider.subscription(nodes_map.observe(move |txn, event| {
80 for (key, change) in event.keys(txn) {
81 match change {
82 yrs::types::EntryChange::Inserted(value) => {
83 println!("新增 key: {key}, value: {value:?}");
84 },
85 yrs::types::EntryChange::Removed(old_value) => {
86 println!(
87 "删除 key: {key}, old value: {old_value:?}"
88 );
89 },
90 yrs::types::EntryChange::Updated(old_value, new_value) => {
91 println!(
92 "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93 );
94 },
95 }
96 }
97 }));
98 provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99 for event in events.iter() {
100 match event {
101 yrs::types::Event::Array(_array_event) => {
102 // 更新了 标记数组 需要转换成 step
103 },
104 yrs::types::Event::Map(_map_event) => {
105 // 更新了 节点属性 需要转换成 step 或者 添加节点
106 },
107 _ => {},
108 }
109 }
110 }));
111 }
112 {
113 let client_id_ref = client_id;
114 let mut awareness_lock = awareness.write().await;
115 provider.subscription(awareness_lock.on_update(move |event| {
116 println!("📡 awareness update: {:?}", event.awareness_state());
117 let states = event.awareness_state();
118 for client_id in states.all_clients() {
119 let meta: &yrs::sync::awareness::MetaClientState =
120 states.get_meta(client_id).unwrap();
121 if client_id == client_id_ref {
122 // 本地客户端
123 println!(
124 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125 client_id, meta.clock, meta.last_updated
126 );
127 } else {
128 // 远程客户端
129 println!(
130 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131 client_id, meta.clock, meta.last_updated
132 );
133 }
134 }
135 }));
136
137 // 🎯 使用唯一的用户 ID 避免状态覆盖
138 awareness_lock.set_local_state(
139 serde_json::json!({
140 "user": {
141 "id": unique_user_id,
142 "name": "用户李兴栋",
143 "color": "#FFEAA7",
144 "online": true,
145 "client_id": client_id,
146 "timestamp": chrono::Utc::now().timestamp()
147 },
148 "online": true
149 })
150 .to_string(),
151 );
152
153 tracing::info!("✅ 设置 awareness 状态完成");
154 }
155
156 // 4. 事件循环
157 let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159 loop {
160 tokio::select! {
161
162 // 定期添加本地更改以测试发送更新
163 _ = time::sleep(Duration::from_secs(3)) => {
164 if provider.is_connected() {
165 counter += 1; // 🔄 递增计数器
166
167 let mut awareness_lock = awareness.write().await;
168 awareness_lock.set_local_state(serde_json::json!({
169 "user": {
170 "id": unique_user_id,
171 "name": "用户李兴栋",
172 "color": "#FFEAA7",
173 "online": true,
174 "client_id": client_id,
175 "timestamp": chrono::Utc::now().timestamp(),
176 "heartbeat": counter // 🔄 添加变化的字段
177 },
178 "online": true,
179 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
180 }).to_string());
181
182 let doc = awareness_lock.doc_mut();
183 let nodes_map = doc.get_or_insert_map("nodes");
184 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185 // 生成新节点 ID
186 let node_id = uuid::Uuid::new_v4().to_string();
187
188 // 简单地插入一个文本值作为节点内容
189 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192 // 事务会在 drop 时自动提交
193 drop(txn);
194 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195 }
196 }
197 // 处理 Ctrl-C 以优雅地断开连接
198 _ = tokio::signal::ctrl_c() => {
199 tracing::info!("🔌 正在断开连接...");
200 provider.disconnect().await;
201 tracing::info!("✅ 已断开连接。");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208}Sourcepub fn get_status(&self) -> &ConnectionStatus
pub fn get_status(&self) -> &ConnectionStatus
获取连接状态
Trait Implementations§
Auto Trait Implementations§
impl Freeze for WebsocketProvider
impl !RefUnwindSafe for WebsocketProvider
impl Send for WebsocketProvider
impl Sync for WebsocketProvider
impl Unpin for WebsocketProvider
impl !UnwindSafe for WebsocketProvider
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more