Skip to main content

Awareness

Struct Awareness 

Source
pub struct Awareness { /* private fields */ }
Expand description

The Awareness class implements a simple shared state protocol that can be used for non-persistent data like awareness information (cursor, username, status, ..). Each client can update its own local state and listen to state changes of remote clients.

Each client is identified by a unique client id (something we borrow from doc.clientID). A client can override its own state by propagating a message with an increasing timestamp (clock). If such a message is received, it is applied if the known state of that client is older than the new state (clock < new_clock). If a client thinks that a remote client is offline, it may propagate a message with { clock, state: null, client }. If such a message is received, and the known clock of that client equals the received clock, it will clean the state.

Before a client disconnects, it should propagate a null state with an updated clock.

Implementations§

Source§

impl Awareness

Source

pub fn new(doc: Doc) -> Awareness

Creates a new instance of Awareness struct, which operates over a given document. Awareness instance has full ownership of that document. If necessary it can be accessed using either Awareness::doc or Awareness::doc_mut methods.

Examples found in repository?
examples/client.rs (line 22)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}
Source

pub fn on_update<F>(&self, f: F) -> Subscription
where F: Fn(&Event) + 'static,

Returns a channel receiver for an incoming awareness events. This channel can be cloned.

Examples found in repository?
examples/client.rs (lines 115-135)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}
Source

pub fn doc(&self) -> &Doc

Returns a read-only reference to an underlying Doc.

Examples found in repository?
examples/client.rs (line 77)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}
Source

pub fn doc_mut(&mut self) -> &mut Doc

Returns a read-write reference to an underlying Doc.

Examples found in repository?
examples/client.rs (line 182)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}
Source

pub fn client_id(&self) -> u64

Returns a globally unique client ID of an underlying Doc.

Source

pub fn clients(&self) -> &HashMap<u64, String>

Returns a state map of all of the clients tracked by current Awareness instance. Those states are identified by their corresponding ClientIDs. The associated state is represented and replicated to other clients as a JSON string.

Source

pub fn local_state(&self) -> Option<&str>

Returns a JSON string state representation of a current Awareness instance.

Source

pub fn set_local_state<S>(&mut self, json: S)
where S: Into<String>,

Sets a current Awareness instance state to a corresponding JSON string. This state will be replicated to other clients as part of the AwarenessUpdate and it will trigger an event to be emitted if current instance was created using [Awareness::with_observer] method.

Examples found in repository?
examples/client.rs (lines 138-151)
10async fn main() -> Result<(), Box<dyn std::error::Error>> {
11    // 初始化日志
12    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
13
14    // 从 `collaboration.rs` 测试中获取服务器详细信息
15    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
16    let room_name = "demo-room";
17
18    // 1. 初始化客户端的文档和 awareness 状态
19    let doc = Doc::new();
20    let client_id = doc.client_id();
21    let awareness: AwarenessRef =
22        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
23
24    // 🔍 生成唯一的用户 ID
25    let unique_user_id =
26        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
27
28    tracing::info!("🆔 Yrs Client ID: {}", client_id);
29    tracing::info!("👤 User ID: {}", unique_user_id);
30
31    let mut provider = WebsocketProvider::new(
32        server_url.to_string(),
33        room_name.to_string(),
34        awareness.clone(),
35    )
36    .await;
37
38    // 订阅同步事件
39    if let Some(mut receiver) = provider.subscribe_sync_events() {
40        tokio::spawn(async move {
41            while let Ok(event) = receiver.recv().await {
42                match event {
43                    SyncEvent::InitialSyncCompleted {
44                        has_data,
45                        elapsed_ms,
46                    } => {
47                        if has_data {
48                            println!(
49                                "🎉 同步完成,房间有数据!耗时: {elapsed_ms}ms"
50                            );
51                        } else {
52                            println!(
53                                "📭 同步完成,空房间!耗时: {elapsed_ms}ms"
54                            );
55                        }
56                    },
57                    SyncEvent::ProtocolStateChanged(state) => {
58                        println!("📡 协议状态: {state:?}");
59                    },
60                    SyncEvent::DataReceived => {
61                        println!("📥 收到数据更新");
62                    },
63                    SyncEvent::ConnectionFailed(error) => {
64                        println!("🔌 监听: {error:?}");
65                    },
66                    SyncEvent::ConnectionChanged(status) => {
67                        println!("🔌 连接状态: {status:?}");
68                    },
69                }
70            }
71        });
72    }
73
74    // 3. 连接到服务器
75    provider.connect().await;
76    {
77        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
78        // 订阅 nodes变更 浅
79        provider.subscription(nodes_map.observe(move |txn, event| {
80            for (key, change) in event.keys(txn) {
81                match change {
82                    yrs::types::EntryChange::Inserted(value) => {
83                        println!("新增 key: {key}, value: {value:?}");
84                    },
85                    yrs::types::EntryChange::Removed(old_value) => {
86                        println!(
87                            "删除 key: {key}, old value: {old_value:?}"
88                        );
89                    },
90                    yrs::types::EntryChange::Updated(old_value, new_value) => {
91                        println!(
92                            "更新 key: {key}, old: {old_value:?}, new: {new_value:?}"
93                        );
94                    },
95                }
96            }
97        }));
98        provider.subscription(nodes_map.observe_deep(move |_txn, events| {
99            for event in events.iter() {
100                match event {
101                    yrs::types::Event::Array(_array_event) => {
102                        // 更新了 标记数组 需要转换成 step
103                    },
104                    yrs::types::Event::Map(_map_event) => {
105                        // 更新了 节点属性 需要转换成 step 或者 添加节点
106                    },
107                    _ => {},
108                }
109            }
110        }));
111    }
112    {
113        let client_id_ref = client_id;
114        let mut awareness_lock = awareness.write().await;
115        provider.subscription(awareness_lock.on_update(move |event| {
116            println!("📡 awareness update: {:?}", event.awareness_state());
117            let states = event.awareness_state();
118            for client_id in states.all_clients() {
119                let meta: &yrs::sync::awareness::MetaClientState =
120                    states.get_meta(client_id).unwrap();
121                if client_id == client_id_ref {
122                    // 本地客户端
123                    println!(
124                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
125                        client_id, meta.clock, meta.last_updated
126                    );
127                } else {
128                    // 远程客户端
129                    println!(
130                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
131                        client_id, meta.clock, meta.last_updated
132                    );
133                }
134            }
135        }));
136
137        // 🎯 使用唯一的用户 ID 避免状态覆盖
138        awareness_lock.set_local_state(
139            serde_json::json!({
140                "user": {
141                    "id": unique_user_id,
142                    "name": "用户李兴栋",
143                    "color": "#FFEAA7",
144                    "online": true,
145                    "client_id": client_id,
146                    "timestamp": chrono::Utc::now().timestamp()
147                },
148                "online": true
149            })
150            .to_string(),
151        );
152
153        tracing::info!("✅ 设置 awareness 状态完成");
154    }
155
156    // 4. 事件循环
157    let mut counter = 0; // 🔄 添加计数器确保状态变化
158
159    loop {
160        tokio::select! {
161
162            // 定期添加本地更改以测试发送更新
163            _ = time::sleep(Duration::from_secs(3)) => {
164                if provider.is_connected() {
165                    counter += 1; // 🔄 递增计数器
166
167                    let mut awareness_lock = awareness.write().await;
168                    awareness_lock.set_local_state(serde_json::json!({
169                        "user": {
170                            "id": unique_user_id,
171                            "name": "用户李兴栋",
172                            "color": "#FFEAA7",
173                            "online": true,
174                            "client_id": client_id,
175                            "timestamp": chrono::Utc::now().timestamp(),
176                            "heartbeat": counter  // 🔄 添加变化的字段
177                        },
178                        "online": true,
179                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
180                    }).to_string());
181
182                    let doc = awareness_lock.doc_mut();
183                    let nodes_map = doc.get_or_insert_map("nodes");
184                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
185                    // 生成新节点 ID
186            let node_id = uuid::Uuid::new_v4().to_string();
187
188            // 简单地插入一个文本值作为节点内容
189            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{node_id}\", \"client\": \"rust_client\"}}");
190            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
191
192            // 事务会在 drop 时自动提交
193            drop(txn);
194                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
195             }
196            }
197            // 处理 Ctrl-C 以优雅地断开连接
198            _ = tokio::signal::ctrl_c() => {
199                tracing::info!("🔌 正在断开连接...");
200                provider.disconnect().await;
201                tracing::info!("✅ 已断开连接。");
202                break;
203            }
204        }
205    }
206
207    Ok(())
208}
Source

pub fn remove_state(&mut self, client_id: u64)

Clears out a state of a given client, effectively marking it as disconnected.

Source

pub fn clean_local_state(&mut self)

Clears out a state of a current client (see: Awareness::client_id), effectively marking it as disconnected.

Source

pub fn update(&self) -> Result<AwarenessUpdate, Error>

Returns a serializable update object which is representation of a current Awareness state.

Source

pub fn update_with_clients<I>( &self, clients: I, ) -> Result<AwarenessUpdate, Error>
where I: IntoIterator<Item = u64>,

Returns a serializable update object which is representation of a current Awareness state. Unlike Awareness::update, this method variant allows to prepare update only for a subset of known clients. These clients must all be known to a current Awareness instance, otherwise a Error::ClientNotFound error will be returned.

Source

pub fn apply_update(&mut self, update: AwarenessUpdate) -> Result<(), Error>

Applies an update (incoming from remote channel or generated using Awareness::update / Awareness::update_with_clients methods) and modifies a state of a current instance.

If current instance has an observer channel (see: [Awareness::with_observer]), applied changes will also be emitted as events.

Source

pub fn apply_update_summary( &mut self, update: AwarenessUpdate, ) -> Result<Option<AwarenessUpdateSummary>, Error>

Applies an update (incoming from remote channel or generated using Awareness::update / Awareness::update_with_clients methods) and modifies a state of a current instance. Returns an AwarenessUpdateSummary object informing about the changes that were applied.

If current instance has an observer channel (see: [Awareness::with_observer]), applied changes will also be emitted as events.

Trait Implementations§

Source§

impl Debug for Awareness

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Default for Awareness

Source§

fn default() -> Awareness

Returns the “default value” for a type. Read more
Source§

impl Send for Awareness

Source§

impl Sync for Awareness

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Sized + Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more