pub trait Observable: AsRef<Branch> {
type Event;
// Provided method
fn observe<F>(&self, f: F) -> Subscription
where F: Fn(&TransactionMut<'_>, &Self::Event) + 'static,
Event: AsRef<Self::Event> { ... }
}Required Associated Types§
Provided Methods§
Sourcefn observe<F>(&self, f: F) -> Subscription
fn observe<F>(&self, f: F) -> Subscription
Subscribes a given callback to be triggered whenever current y-type is changed. A callback is triggered whenever a transaction gets committed. This function does not trigger if changes have been observed by nested shared collections.
All array-like event changes can be tracked by using [Event::delta] method. All map-like event changes can be tracked by using [Event::keys] method. All text-like event changes can be tracked by using TextEvent::delta method.
Returns a Subscription which, when dropped, will unsubscribe current callback.
Examples found in repository?
examples/client.rs (lines 84-104)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13 // 初始化日志
14 tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16 // 从 `collaboration.rs` 测试中获取服务器详细信息
17 let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18 let room_name = "demo-room";
19
20 // 1. 初始化客户端的文档和 awareness 状态
21 let doc = Doc::new();
22 let client_id = doc.client_id();
23 let awareness: AwarenessRef =
24 Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26 // 🔍 生成唯一的用户 ID
27 let unique_user_id =
28 format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30 tracing::info!("🆔 Yrs Client ID: {}", client_id);
31 tracing::info!("👤 User ID: {}", unique_user_id);
32
33 let mut provider = WebsocketProvider::new(
34 server_url.to_string(),
35 room_name.to_string(),
36 awareness.clone(),
37 )
38 .await;
39
40 // 订阅同步事件
41 if let Some(mut receiver) = provider.subscribe_sync_events() {
42 tokio::spawn(async move {
43 while let Ok(event) = receiver.recv().await {
44 match event {
45 SyncEvent::InitialSyncCompleted {
46 has_data,
47 elapsed_ms,
48 } => {
49 if has_data {
50 println!(
51 "🎉 同步完成,房间有数据!耗时: {}ms",
52 elapsed_ms
53 );
54 } else {
55 println!(
56 "📭 同步完成,空房间!耗时: {}ms",
57 elapsed_ms
58 );
59 }
60 },
61 SyncEvent::ProtocolStateChanged(state) => {
62 println!("📡 协议状态: {:?}", state);
63 },
64 SyncEvent::DataReceived => {
65 println!("📥 收到数据更新");
66 },
67 SyncEvent::ConnectionFailed(error) => {
68 println!("🔌 监听: {:?}", error);
69 },
70 SyncEvent::ConnectionChanged(status) => {
71 println!("🔌 连接状态: {:?}", status);
72 },
73 _ => {},
74 }
75 }
76 });
77 }
78
79 // 3. 连接到服务器
80 provider.connect().await;
81 {
82 let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83 // 订阅 nodes变更 浅
84 provider.subscription(nodes_map.observe(move |txn, event| {
85 for (key, change) in event.keys(txn) {
86 match change {
87 yrs::types::EntryChange::Inserted(value) => {
88 println!("新增 key: {}, value: {:?}", key, value);
89 },
90 yrs::types::EntryChange::Removed(old_value) => {
91 println!(
92 "删除 key: {}, old value: {:?}",
93 key, old_value
94 );
95 },
96 yrs::types::EntryChange::Updated(old_value, new_value) => {
97 println!(
98 "更新 key: {}, old: {:?}, new: {:?}",
99 key, old_value, new_value
100 );
101 },
102 }
103 }
104 }));
105 provider.subscription(nodes_map.observe_deep(move |txn, events| {
106 for event in events.iter() {
107 match event {
108 yrs::types::Event::Array(array_event) => {
109 // 更新了 标记数组 需要转换成 step
110 },
111 yrs::types::Event::Map(map_event) => {
112 // 更新了 节点属性 需要转换成 step 或者 添加节点
113 },
114 _ => {},
115 }
116 }
117 }));
118 }
119 {
120 let client_id_ref = client_id;
121 let mut awareness_lock = awareness.write().await;
122 provider.subscription(awareness_lock.on_update(move |event| {
123 println!("📡 awareness update: {:?}", event.awareness_state());
124 let states = event.awareness_state();
125 for client_id in states.all_clients() {
126 let meta: &yrs::sync::awareness::MetaClientState =
127 states.get_meta(client_id).unwrap();
128 if client_id == client_id_ref {
129 // 本地客户端
130 println!(
131 "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132 client_id, meta.clock, meta.last_updated
133 );
134 } else {
135 // 远程客户端
136 println!(
137 "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138 client_id, meta.clock, meta.last_updated
139 );
140 }
141 }
142 }));
143
144 // 🎯 使用唯一的用户 ID 避免状态覆盖
145 awareness_lock.set_local_state(
146 serde_json::json!({
147 "user": {
148 "id": unique_user_id,
149 "name": "用户李兴栋",
150 "color": "#FFEAA7",
151 "online": true,
152 "client_id": client_id,
153 "timestamp": chrono::Utc::now().timestamp()
154 },
155 "online": true
156 })
157 .to_string(),
158 );
159
160 tracing::info!("✅ 设置 awareness 状态完成");
161 }
162
163 // 4. 事件循环
164 let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166 loop {
167 tokio::select! {
168
169 // 定期添加本地更改以测试发送更新
170 _ = time::sleep(Duration::from_secs(3)) => {
171 if provider.is_connected() {
172 counter += 1; // 🔄 递增计数器
173
174 let mut awareness_lock = awareness.write().await;
175 awareness_lock.set_local_state(serde_json::json!({
176 "user": {
177 "id": unique_user_id,
178 "name": "用户李兴栋",
179 "color": "#FFEAA7",
180 "online": true,
181 "client_id": client_id,
182 "timestamp": chrono::Utc::now().timestamp(),
183 "heartbeat": counter // 🔄 添加变化的字段
184 },
185 "online": true,
186 "last_activity": chrono::Utc::now().timestamp() // 🔄 另一个变化字段
187 }).to_string());
188
189 let doc = awareness_lock.doc_mut();
190 let nodes_map = doc.get_or_insert_map("nodes");
191 let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192 // 生成新节点 ID
193 let node_id = uuid::Uuid::new_v4().to_string();
194
195 // 简单地插入一个文本值作为节点内容
196 let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197 nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199 // 事务会在 drop 时自动提交
200 drop(txn);
201 tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202 }
203 }
204 // 处理 Ctrl-C 以优雅地断开连接
205 _ = tokio::signal::ctrl_c() => {
206 tracing::info!("🔌 正在断开连接...");
207 provider.disconnect().await;
208 tracing::info!("✅ 已断开连接。");
209 break;
210 }
211 }
212 }
213
214 Ok(())
215}Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.