pub struct Awareness { /* private fields */ }Expand description
The Awareness class implements a simple shared state protocol that can be used for non-persistent data like awareness information (cursor, username, status, ..). Each client can update its own local state and listen to state changes of remote clients.
Each client is identified by a unique client id (something we borrow from doc.clientID).
A client can override its own state by propagating a message with an increasing timestamp
(clock). If such a message is received, it is applied if the known state of that client is
older than the new state (clock < new_clock). If a client thinks that a remote client is
offline, it may propagate a message with { clock, state: null, client }. If such a message is
received, and the known clock of that client equals the received clock, it will clean the state.
Before a client disconnects, it should propagate a null state with an updated clock.
Implementations§
Source§impl Awareness
impl Awareness
Sourcepub fn new(doc: Doc) -> Awareness
pub fn new(doc: Doc) -> Awareness
Creates a new instance of Awareness struct, which operates over a given document. Awareness instance has full ownership of that document. If necessary it can be accessed using either Awareness::doc or Awareness::doc_mut methods.
Examples found in repository?
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Sourcepub fn on_update<F>(&self, f: F) -> Subscription
pub fn on_update<F>(&self, f: F) -> Subscription
Returns a channel receiver for an incoming awareness events. This channel can be cloned.
Examples found in repository?
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Sourcepub fn doc(&self) -> &Doc
pub fn doc(&self) -> &Doc
Returns a read-only reference to an underlying Doc.
Examples found in repository?
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Sourcepub fn doc_mut(&mut self) -> &mut Doc
pub fn doc_mut(&mut self) -> &mut Doc
Returns a read-write reference to an underlying Doc.
Examples found in repository?
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Sourcepub fn local_state(&self) -> Option<&str>
pub fn local_state(&self) -> Option<&str>
Returns a JSON string state representation of a current Awareness instance.
Sourcepub fn set_local_state<S>(&mut self, json: S)
pub fn set_local_state<S>(&mut self, json: S)
Sets a current Awareness instance state to a corresponding JSON string. This state will be replicated to other clients as part of the AwarenessUpdate and it will trigger an event to be emitted if current instance was created using [Awareness::with_observer] method.
Examples found in repository?
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Sourcepub fn remove_state(&mut self, client_id: u64)
pub fn remove_state(&mut self, client_id: u64)
Clears out a state of a given client, effectively marking it as disconnected.
Sourcepub fn clean_local_state(&mut self)
pub fn clean_local_state(&mut self)
Clears out a state of a current client (see: Awareness::client_id), effectively marking it as disconnected.
Sourcepub fn update(&self) -> Result<AwarenessUpdate, Error>
pub fn update(&self) -> Result<AwarenessUpdate, Error>
Returns a serializable update object which is representation of a current Awareness state.
Sourcepub fn update_with_clients<I>(
&self,
clients: I,
) -> Result<AwarenessUpdate, Error>where
I: IntoIterator<Item = u64>,
pub fn update_with_clients<I>(
&self,
clients: I,
) -> Result<AwarenessUpdate, Error>where
I: IntoIterator<Item = u64>,
Returns a serializable update object which is representation of a current Awareness state. Unlike Awareness::update, this method variant allows to prepare update only for a subset of known clients. These clients must all be known to a current Awareness instance, otherwise a Error::ClientNotFound error will be returned.
Sourcepub fn apply_update(&mut self, update: AwarenessUpdate) -> Result<(), Error>
pub fn apply_update(&mut self, update: AwarenessUpdate) -> Result<(), Error>
Applies an update (incoming from remote channel or generated using Awareness::update / Awareness::update_with_clients methods) and modifies a state of a current instance.
If current instance has an observer channel (see: [Awareness::with_observer]), applied changes will also be emitted as events.
Sourcepub fn apply_update_summary(
&mut self,
update: AwarenessUpdate,
) -> Result<Option<AwarenessUpdateSummary>, Error>
pub fn apply_update_summary( &mut self, update: AwarenessUpdate, ) -> Result<Option<AwarenessUpdateSummary>, Error>
Applies an update (incoming from remote channel or generated using Awareness::update / Awareness::update_with_clients methods) and modifies a state of a current instance. Returns an AwarenessUpdateSummary object informing about the changes that were applied.
If current instance has an observer channel (see: [Awareness::with_observer]), applied changes will also be emitted as events.
Trait Implementations§
impl Send for Awareness
impl Sync for Awareness
Auto Trait Implementations§
impl !Freeze for Awareness
impl !RefUnwindSafe for Awareness
impl Unpin for Awareness
impl !UnwindSafe for Awareness
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more