Struct WebsocketProvider

Source
pub struct WebsocketProvider {
    pub server_url: String,
    pub room_name: String,
    pub awareness: AwarenessRef,
    pub status: ConnectionStatus,
    pub ws_reconnect_attempts: u32,
    pub max_backoff_time: u64,
    pub ws_url: Option<Url>,
    pub client_id: u64,
    /* private fields */
}

Fields§

§server_url: String§room_name: String§awareness: AwarenessRef§status: ConnectionStatus§ws_reconnect_attempts: u32§max_backoff_time: u64§ws_url: Option<Url>§client_id: u64

Implementations§

Source§

impl WebsocketProvider

Source

pub async fn new( server_url: String, room_name: String, awareness: AwarenessRef, ) -> Self

Examples found in repository?
examples/client.rs (lines 33-37)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub fn subscription(&mut self, subscription: Subscription)

Examples found in repository?
examples/client.rs (lines 84-104)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub async fn connect(&mut self)

Examples found in repository?
examples/client.rs (line 80)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub async fn connect_with_retry( &mut self, config: Option<ConnectionRetryConfig>, ) -> Result<()>

Source

pub async fn check_server_availability(&self) -> bool

检查服务端是否可用

Source

pub async fn smart_connect(&mut self) -> Result<()>

Source

pub async fn wait_for_protocol_sync(&self, timeout_ms: u64) -> Result<bool>

等待协议级同步完成(包括空房间)

Source

pub async fn get_protocol_sync_state(&self) -> Option<ProtocolSyncState>

获取协议同步状态

Source

pub fn subscribe_sync_events(&mut self) -> Option<SyncEventReceiver>

订阅同步事件

Examples found in repository?
examples/client.rs (line 41)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub async fn disconnect(&mut self)

断开连接并清理资源

Examples found in repository?
examples/client.rs (line 207)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub fn is_connected(&self) -> bool

检查连接状态

Examples found in repository?
examples/client.rs (line 171)
12async fn main() -> Result<(), Box<dyn std::error::Error>> {
13    // 初始化日志
14    tracing_subscriber::fmt().with_max_level(tracing::Level::INFO).init();
15
16    // 从 `collaboration.rs` 测试中获取服务器详细信息
17    let server_url = "ws://127.0.0.1:8080/collaboration"; // 确保端口与服务器测试匹配
18    let room_name = "demo-room";
19
20    // 1. 初始化客户端的文档和 awareness 状态
21    let doc = Doc::new();
22    let client_id = doc.client_id();
23    let awareness: AwarenessRef =
24        Arc::new(tokio::sync::RwLock::new(Awareness::new(doc)));
25
26    // 🔍 生成唯一的用户 ID
27    let unique_user_id =
28        format!("client-user-{}-{}", chrono::Utc::now().timestamp(), client_id);
29
30    tracing::info!("🆔 Yrs Client ID: {}", client_id);
31    tracing::info!("👤 User ID: {}", unique_user_id);
32
33    let mut provider = WebsocketProvider::new(
34        server_url.to_string(),
35        room_name.to_string(),
36        awareness.clone(),
37    )
38    .await;
39
40    // 订阅同步事件
41    if let Some(mut receiver) = provider.subscribe_sync_events() {
42        tokio::spawn(async move {
43            while let Ok(event) = receiver.recv().await {
44                match event {
45                    SyncEvent::InitialSyncCompleted {
46                        has_data,
47                        elapsed_ms,
48                    } => {
49                        if has_data {
50                            println!(
51                                "🎉 同步完成,房间有数据!耗时: {}ms",
52                                elapsed_ms
53                            );
54                        } else {
55                            println!(
56                                "📭 同步完成,空房间!耗时: {}ms",
57                                elapsed_ms
58                            );
59                        }
60                    },
61                    SyncEvent::ProtocolStateChanged(state) => {
62                        println!("📡 协议状态: {:?}", state);
63                    },
64                    SyncEvent::DataReceived => {
65                        println!("📥 收到数据更新");
66                    },
67                    SyncEvent::ConnectionFailed(error) => {
68                        println!("🔌 监听: {:?}", error);
69                    },
70                    SyncEvent::ConnectionChanged(status) => {
71                        println!("🔌 连接状态: {:?}", status);
72                    },
73                    _ => {},
74                }
75            }
76        });
77    }
78
79    // 3. 连接到服务器
80    provider.connect().await;
81    {
82        let nodes_map = awareness.read().await.doc().get_or_insert_map("nodes");
83        // 订阅 nodes变更 浅
84        provider.subscription(nodes_map.observe(move |txn, event| {
85            for (key, change) in event.keys(txn) {
86                match change {
87                    yrs::types::EntryChange::Inserted(value) => {
88                        println!("新增 key: {}, value: {:?}", key, value);
89                    },
90                    yrs::types::EntryChange::Removed(old_value) => {
91                        println!(
92                            "删除 key: {}, old value: {:?}",
93                            key, old_value
94                        );
95                    },
96                    yrs::types::EntryChange::Updated(old_value, new_value) => {
97                        println!(
98                            "更新 key: {}, old: {:?}, new: {:?}",
99                            key, old_value, new_value
100                        );
101                    },
102                }
103            }
104        }));
105        provider.subscription(nodes_map.observe_deep(move |txn, events| {
106            for event in events.iter() {
107                match event {
108                    yrs::types::Event::Array(array_event) => {
109                        // 更新了 标记数组 需要转换成 step
110                    },
111                    yrs::types::Event::Map(map_event) => {
112                        // 更新了 节点属性 需要转换成 step 或者 添加节点
113                    },
114                    _ => {},
115                }
116            }
117        }));
118    }
119    {
120        let client_id_ref = client_id;
121        let mut awareness_lock = awareness.write().await;
122        provider.subscription(awareness_lock.on_update(move |event| {
123            println!("📡 awareness update: {:?}", event.awareness_state());
124            let states = event.awareness_state();
125            for client_id in states.all_clients() {
126                let meta: &yrs::sync::awareness::MetaClientState =
127                    states.get_meta(client_id).unwrap();
128                if client_id == client_id_ref {
129                    // 本地客户端
130                    println!(
131                        "🏠 本地客户端 {}: clock={}, last_updated={:?}",
132                        client_id, meta.clock, meta.last_updated
133                    );
134                } else {
135                    // 远程客户端
136                    println!(
137                        "🌐 远程客户端 {}: clock={}, last_updated={:?}",
138                        client_id, meta.clock, meta.last_updated
139                    );
140                }
141            }
142        }));
143
144        // 🎯 使用唯一的用户 ID 避免状态覆盖
145        awareness_lock.set_local_state(
146            serde_json::json!({
147                "user": {
148                    "id": unique_user_id,
149                    "name": "用户李兴栋",
150                    "color": "#FFEAA7",
151                    "online": true,
152                    "client_id": client_id,
153                    "timestamp": chrono::Utc::now().timestamp()
154                },
155                "online": true
156            })
157            .to_string(),
158        );
159
160        tracing::info!("✅ 设置 awareness 状态完成");
161    }
162
163    // 4. 事件循环
164    let mut counter = 0; // 🔄 添加计数器确保状态变化
165
166    loop {
167        tokio::select! {
168
169            // 定期添加本地更改以测试发送更新
170            _ = time::sleep(Duration::from_secs(3)) => {
171                if provider.is_connected() {
172                    counter += 1; // 🔄 递增计数器
173
174                    let mut awareness_lock = awareness.write().await;
175                    awareness_lock.set_local_state(serde_json::json!({
176                        "user": {
177                            "id": unique_user_id,
178                            "name": "用户李兴栋",
179                            "color": "#FFEAA7",
180                            "online": true,
181                            "client_id": client_id,
182                            "timestamp": chrono::Utc::now().timestamp(),
183                            "heartbeat": counter  // 🔄 添加变化的字段
184                        },
185                        "online": true,
186                        "last_activity": chrono::Utc::now().timestamp()  // 🔄 另一个变化字段
187                    }).to_string());
188
189                    let doc = awareness_lock.doc_mut();
190                    let nodes_map = doc.get_or_insert_map("nodes");
191                    let mut txn = doc.transact_mut_with(doc.client_id().to_string());
192                    // 生成新节点 ID
193            let node_id = uuid::Uuid::new_v4().to_string();
194
195            // 简单地插入一个文本值作为节点内容
196            let node_content = format!("{{\"type\": \"DXGC\", \"id\": \"{}\", \"client\": \"rust_client\"}}", node_id);
197            nodes_map.insert(&mut txn, node_id.as_str(), node_content.as_str());
198
199            // 事务会在 drop 时自动提交
200            drop(txn);
201                    tracing::info!("📝 已发送本地文档更改,heartbeat: {}", counter);
202             }
203            }
204            // 处理 Ctrl-C 以优雅地断开连接
205            _ = tokio::signal::ctrl_c() => {
206                tracing::info!("🔌 正在断开连接...");
207                provider.disconnect().await;
208                tracing::info!("✅ 已断开连接。");
209                break;
210            }
211        }
212    }
213
214    Ok(())
215}
Source

pub fn get_status(&self) -> &ConnectionStatus

获取连接状态

Trait Implementations§

Source§

impl Drop for WebsocketProvider

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,