Trait Observable

Source
pub trait Observable: AsRef<Branch> {
    type Event;

    // Provided method
    fn observe<F>(&self, f: F) -> Subscription
       where F: Fn(&TransactionMut<'_>, &Self::Event) + 'static,
             Event: AsRef<Self::Event> { ... }
}

Required Associated Types§

Provided Methods§

Source

fn observe<F>(&self, f: F) -> Subscription
where F: Fn(&TransactionMut<'_>, &Self::Event) + 'static, Event: AsRef<Self::Event>,

Subscribes a given callback to be triggered whenever current y-type is changed. A callback is triggered whenever a transaction gets committed. This function does not trigger if changes have been observed by nested shared collections.

All array-like event changes can be tracked by using [Event::delta] method. All map-like event changes can be tracked by using [Event::keys] method. All text-like event changes can be tracked by using TextEvent::delta method.

Returns a Subscription which, when dropped, will unsubscribe current callback.

Examples found in repository?
examples/client.rs (lines 84-104)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§